# Mimid : Inferring Grammars* Code for subjects [here](#Our-subject-programs)* Evaluation starts [here](#Evaluation) * The evaluation on specific subjects starts [here](#Subjects) * [CGIDecode](#CGIDecode) * [Calculator](#Calculator) * [MathExpr](#MathExpr) * [URLParse](#URLParse) * [Netrc](#Netrc) * [Microjson](#Microjson)* Results are [here](#Results)* Recovering parse tree from a recognizer is [here](#Using-a-Recognizer-(not-a-Parser))* Recovering parse tree from parser combinators is [here](#Parsing-with-Parser-Combinators)* Recovering parse tree from PEG parer is [here](#Parsing-with-PEG-Parser)Please note that a complete run can take an hour and a half to complete.Please note that a complete run can take an hour and a half to complete.
We start with a few Jupyter magics that let us specify examples inline, that can be turned off if needed for faster execution. Switch `TOP to False` if you do not want examples to complete.We start with a few Jupyter magics that let us specify examples inline, that can be turned off if needed for faster execution. Switch TOP to False if you do not want examples to complete.
xxxxxxxxxxTOP = __name__ == '__main__'The magics we use are `%%var` and `%top`. The `%%var` lets us specify large strings such as file contents directly without too many escapes. The `%top` helps with examples.The magics we use are %%var and %top. The %%var lets us specify large strings such as file contents directly without too many escapes. The %top helps with examples.
xxxxxxxxxxfrom IPython.core.magic import (Magics, magics_class, cell_magic, line_magic, line_cell_magic)class B(dict): def __getattr__(self, name): return self.__getitem__(name)class MyMagics(Magics): def __init__(self, shell=None, **kwargs): super().__init__(shell=shell, **kwargs) self._vars = B() shell.user_ns['VARS'] = self._vars def var(self, line, cell): self._vars[line.strip()] = cell.strip() def top(self, line, cell=None): if TOP: if cell is None: cell = line ip = get_ipython() res = ip.run_cell(cell)get_ipython().register_magics(MyMagics)xxxxxxxxxximport sysParts of the program, especially the subprocess execution using `do()` requires the new flags in `3.7`. I am not sure if the taints will work on anything above.Parts of the program, especially the subprocess execution using do() requires the new flags in 3.7. I am not sure if the taints will work on anything above.
xxxxxxxxxx%top assert sys.version_info[0:2] == (3, 7)xxxxxxxxxxfrom subprocess import runxxxxxxxxxximport osWe keep a log of all system commands executed for easier debugging at `./build/do.log`.We keep a log of all system commands executed for easier debugging at ./build/do.log.
xxxxxxxxxxdef do(command, env=None, shell=False, log=False, **args): result = run(command, universal_newlines=True, shell=shell, env=dict(os.environ, **({} if env is None else env)), capture_output=True, **args) if log: with open('build/do.log', 'a+') as f: print(json.dumps({'cmd':command, 'env':env, 'exitcode':result.returncode}), env, file=f) return resultxxxxxxxxxximport randomTry to ensure replicability of measurements.Try to ensure replicability of measurements.
xxxxxxxxxxrandom.seed(0)Note that this notebook was tested on `Debian GNU/Linux 8.10 and 9.9` and `MacOS Mojave 10.14.5`. In particular, I do not know if everything will work on `Windows`.Note that this notebook was tested on Debian GNU/Linux 8.10 and 9.9 and MacOS Mojave 10.14.5. In particular, I do not know if everything will work on Windows.
xxxxxxxxxximport shutilxxxxxxxxxx%%topif shutil.which('lsb_release'): res = do(['lsb_release', '-d']).stdoutelif shutil.which('sw_vers'): res = do(['sw_vers']).stdoutelse: assert Falseresxxxxxxxxxx%top do(['jupyter', '--version']).stdoutOur code is based on the utilities provided by the [Fuzzingbook](http://fuzzingbook.org). Note that the measurements on time and precision in paper were based on Fuzzingbook `0.0.7`. During the development, we found a few bugs in Autogram, which we communicated back, which resulted in a new version of Fuzzingbook `0.8.0`.The fixed *Autogram* implementation of the *Fuzzingbook* has better precision rates for *Autogram*, and timing for grammar generation. However, these numbers still fall short of *Mimid* for most grammars. Further, the grammars generated by *Autogram* are still enumerative. That is, rather than producing a context free grammar, it simply appends input strings as alternates to the `<START>` nonterminal. This again results in bad recall numbers as before. Hence, it does not change our main points. During the remainder of this notebook, we use the `0.8.0` version of the Fuzzingbook.Our code is based on the utilities provided by the Fuzzingbook. Note that the measurements on time and precision in paper were based on Fuzzingbook 0.0.7. During the development, we found a few bugs in Autogram, which we communicated back, which resulted in a new version of Fuzzingbook 0.8.0.
The fixed Autogram implementation of the Fuzzingbook has better precision rates for Autogram, and timing for grammar generation. However, these numbers still fall short of Mimid for most grammars. Further, the grammars generated by Autogram are still enumerative. That is, rather than producing a context free grammar, it simply appends input strings as alternates to the <START> nonterminal. This again results in bad recall numbers as before. Hence, it does not change our main points. During the remainder of this notebook, we use the 0.8.0 version of the Fuzzingbook.
First we define `pip_install()`, a helper to silently install required dependencies.First we define pip_install(), a helper to silently install required dependencies.
xxxxxxxxxxdef pip_install(v): return do(['pip', 'install', '-qqq', *v.split(' ')]).returncodexxxxxxxxxx%top pip_install('fuzzingbook==0.8.0')Our external dependencies other than `fuzzingbook` are as follows.Our external dependencies other than fuzzingbook are as follows.
xxxxxxxxxx%top pip_install('astor graphviz scipy')**IMPORTANT:** Restart the jupyter server after installation of dependencies and extensions.IMPORTANT: Restart the jupyter server after installation of dependencies and extensions.
We recommend the following jupyter notebook extensions:We recommend the following jupyter notebook extensions:
xxxxxxxxxx%top pip_install('jupyter_contrib_nbextensions jupyter_nbextensions_configurator')xxxxxxxxxx%top do(['jupyter','contrib','nbextension','install','--user']).returncodexxxxxxxxxxdef nb_enable(v): return do(['jupyter','nbextension','enable',v]).returncodexxxxxxxxxx%top do(['jupyter','nbextensions_configurator','enable','--user']).returncode#### Table of contentsPlease install this extension. The navigation in the notebook is rather hard without this installed.Please install this extension. The navigation in the notebook is rather hard without this installed.
xxxxxxxxxx%top nb_enable('toc2/main')#### Collapsible headingsAgain, do install this extension. This will let you fold away those sections that you do not have an immediate interest in.Again, do install this extension. This will let you fold away those sections that you do not have an immediate interest in.
xxxxxxxxxx%top nb_enable('collapsible_headings/main')#### Execute timeThis is not strictly necessary, but can provide a better breakdown than `timeit` that we use for timing.This is not strictly necessary, but can provide a better breakdown than timeit that we use for timing.
xxxxxxxxxx%top nb_enable('execute_time/ExecuteTime')#### Code foldingVery helpful for hiding away source contents of libraries that are not for grammar recovery.Very helpful for hiding away source contents of libraries that are not for grammar recovery.
xxxxxxxxxx%top nb_enable('codefolding/main')To make runs faster, we cache quite a lot of things. Remove `build` if you change code or samples.To make runs faster, we cache quite a lot of things. Remove build if you change code or samples.
xxxxxxxxxx%top do(['rm', '-rf','build']).returncodeAs we mentioned before `%%var` defines a multi line embedded string that is accessible from Python.As we mentioned before %%var defines a multi line embedded string that is accessible from Python.
xxxxxxxxxx%%var Mimid# [(Testing Mimid# )]xxxxxxxxxx%top VARS['Mimid']Note that our taint tracking implementation is incomplete in that only some of the functions in Python are proxied to preserve taints. Hence, we modify source slightly where necessary to use the proxied functions without affecting the evaluation of the grammar inferencing algorithm.Note that our taint tracking implementation is incomplete in that only some of the functions in Python are proxied to preserve taints. Hence, we modify source slightly where necessary to use the proxied functions without affecting the evaluation of the grammar inferencing algorithm.
### Calculator.pyThis is a really simple calculator written in text book recursive descent style. Note that I have used `list()` in a few places to help out with taint tracking. This is due to the limitations of my taint tracking prototype. It can be fixed if required by simple AST walkers or better taint trackers.This is a really simple calculator written in text book recursive descent style. Note that I have used list() in a few places to help out with taint tracking. This is due to the limitations of my taint tracking prototype. It can be fixed if required by simple AST walkers or better taint trackers.
xxxxxxxxxx%%var calc_src# [(import stringdef is_digit(i): return i in list(string.digits) # Need list() for taints tracking in prototype def parse_num(s,i): n = '' while s[i:] and is_digit(s[i]): n += s[i] i = i +1 return i,ndef parse_paren(s, i): assert s[i] == '(' i, v = parse_expr(s, i+1) if s[i:] == '': raise Exception(s, i) assert s[i] == ')' return i+1, vdef parse_expr(s, i = 0): expr = [] is_op = True while s[i:]: c = s[i] if c in list(string.digits): # Need list() for taints tracking in prototype if not is_op: raise Exception(s,i) i,num = parse_num(s,i) expr.append(num) is_op = False elif c in ['+', '-', '*', '/']: if is_op: raise Exception(s,i) expr.append(c) is_op = True i = i + 1 elif c == '(': if not is_op: raise Exception(s,i) i, cexpr = parse_paren(s, i) expr.append(cexpr) is_op = False elif c == ')': break else: raise Exception(s,i) if is_op: raise Exception(s,i) return i, exprdef main(arg): return parse_expr(arg)# )]### Mathexpr.pyOriginally from [here]( https://github.com/louisfisch/mathematical-expressions-parser). The mathexpr is much more complicated than our `calculator` and supports advanced functionalities such as predefined functions and variables.xxxxxxxxxx%%var mathexpr_src# [(import math_CONSTANTS = { 'pi' : math.pi, 'e' : math.e, 'phi': (1 + 5 ** .5) / 2}_FUNCTIONS = { 'abs': abs, 'acos': math.acos, 'asin': math.asin, 'atan': math.atan, 'atan2': math.atan2, 'ceil': math.ceil, 'cos': math.cos, 'cosh': math.cosh, 'degrees': math.degrees, 'exp': math.exp, 'fabs': math.fabs, 'floor': math.floor, 'fmod': math.fmod, 'frexp': math.frexp, 'hypot': math.hypot, 'ldexp': math.ldexp, 'log': math.log, 'log10': math.log10, 'modf': math.modf, 'pow': math.pow, 'radians': math.radians, 'sin': math.sin, 'sinh': math.sinh, 'sqrt': math.sqrt, 'tan': math.tan, 'tanh': math.tanh}class Parser: def __init__(self, string, vars = None): self.string = string self.index = 0 self.vars = {} if vars == None else vars.copy() for constant in _CONSTANTS.keys(): if self.vars.get(constant) != None: raise Exception("Cannot redefine the value of " + constant) def getValue(self): value = self.parseExpression() self.skipWhitespace() if self.hasNext(): raise Exception( "Unexpected character found: '" + self.peek() + "' at index " + str(self.index) ) return value def peek(self): return self.string[self.index:self.index + 1] def hasNext(self): return self.index < len(self.string) def isNext(self, value): return self.string[self.index:self.index+len(value)] == value def popIfNext(self, value): if self.isNext(value): self.index += len(value) return True return False def popExpected(self, value): if not self.popIfNext(value): raise Exception("Expected '" + value + "' at index " + str(self.index)) def skipWhitespace(self): while self.hasNext(): if self.peek() in list(' \t\n\r'): self.index += 1 else: return def parseExpression(self): return self.parseAddition() def parseAddition(self): values = [self.parseMultiplication()] while True: self.skipWhitespace() char = self.peek() if char == '+': self.index += 1 values.append(self.parseMultiplication()) elif char == '-': self.index += 1 values.append(-1 * self.parseMultiplication()) else: break return sum(values) def parseMultiplication(self): values = [self.parseParenthesis()] while True: self.skipWhitespace() char = self.peek() if char == '*': self.index += 1 values.append(self.parseParenthesis()) elif char == '/': div_index = self.index self.index += 1 denominator = self.parseParenthesis() if denominator == 0: raise Exception( "Division by 0 kills baby whales (occured at index " + str(div_index) + ")" ) values.append(1.0 / denominator) else: break value = 1.0 for factor in values: value *= factor return value def parseParenthesis(self): self.skipWhitespace() char = self.peek() if char == '(': self.index += 1 value = self.parseExpression() self.skipWhitespace() if self.peek() != ')': raise Exception( "No closing parenthesis found at character " + str(self.index) ) self.index += 1 return value else: return self.parseNegative() def parseArguments(self): args = [] self.skipWhitespace() self.popExpected('(') while not self.popIfNext(')'): self.skipWhitespace() if len(args) > 0: self.popExpected(',') self.skipWhitespace() args.append(self.parseExpression()) self.skipWhitespace() return args def parseNegative(self): self.skipWhitespace() char = self.peek() if char == '-': self.index += 1 return -1 * self.parseParenthesis() else: return self.parseValue() def parseValue(self): self.skipWhitespace() char = self.peek() if char in list('0123456789.'): return self.parseNumber() else: return self.parseVariable() def parseVariable(self): self.skipWhitespace() var = [] while self.hasNext(): char = self.peek() if char.lower() in list('_abcdefghijklmnopqrstuvwxyz0123456789'): var.append(char) self.index += 1 else: break s = '' for a in var: s += a # CHANGE from ORIGINAL to preserve taints. We need to taints.w__ join() calls. var = s function = _FUNCTIONS.get(var.lower()) if function != None: args = self.parseArguments() return float(function(*args)) constant = _CONSTANTS.get(var.lower()) if constant != None: return constant value = self.vars.get(var, None) if value != None: return float(value) raise Exception("Unrecognized variable: '" + var + "'") def parseNumber(self): self.skipWhitespace() strValue = '' decimal_found = False char = '' while self.hasNext(): char = self.peek() if char == '.': if decimal_found: raise Exception( "Found an extra period in a number at character " + str(self.index) + ". Are you European?" ) decimal_found = True strValue += '.' elif char in list('0123456789'): strValue += char else: break self.index += 1 if len(strValue) == 0: if char == '': raise Exception("Unexpected end found") else: raise Exception( "I was expecting to find a number at character " + str(self.index) + " but instead I found a '" + char + "'. What's up with that?") return float(strValue)import stringdef main(arg): p = Parser(arg, {a:ord(a) for a in string.ascii_lowercase if a != 'e'}) p.getValue()# )]### Microjson.pyThe microjson is a complete pure python implementation of JSON parser, that was obtained from from [here](https://github.com/phensley/microjson). Note that we use `myio` which is an instrumented version of the original `io` to preserve taints.xxxxxxxxxx%%var microjson_src# [(# microjson - Minimal JSON parser/emitter for use in standalone scripts.# No warranty. Free to use/modify as you see fit. Trades speed for compactness.# Send ideas, bugs, simplifications to http://github.com/phensley# Copyright (c) 2010 Patrick Hensley <spaceboy@indirect.com># stdimport mathimport myio as ioimport types# the '_from_json_number' function returns either float or long.__pychecker__ = 'no-returnvalues'# character classesWS = ' ' # ''.join([' ','\t','\r','\n','\b','\f'])DIGITS = ''.join([str(i) for i in range(0, 10)])NUMSTART = DIGITS + ''.join(['.','-','+'])NUMCHARS = NUMSTART + ''.join(['e','E'])ESC_MAP = {'n':'\n','t':'\t','r':'\r','b':'\b','f':'\f'}REV_ESC_MAP = dict([(_v,_k) for _k,_v in list(ESC_MAP.items())] + [('"','"')])# error messagesE_BYTES = 'input string must be type str containing ASCII or UTF-8 bytes'E_MALF = 'malformed JSON data'E_TRUNC = 'truncated JSON data'E_BOOL = 'expected boolean'E_NULL = 'expected null'E_LITEM = 'expected list item'E_DKEY = 'expected key'E_COMMA = 'missing comma between elements'E_COLON = 'missing colon after key'E_EMPTY = 'found empty string, not valid JSON data'E_BADESC = 'bad escape character found'E_UNSUPP = 'unsupported type "%s" cannot be JSON-encoded'E_BADFLOAT = 'cannot emit floating point value "%s"'NEG_INF = float('-inf')POS_INF = float('inf')class JSONError(Exception): def __init__(self, msg, stm=None, pos=0): if stm: msg += ' at position %d, "%s"' % (pos, repr(stm.substr(pos, 32))) Exception.__init__(self, msg)class JSONStream: # no longer inherit directly from StringIO, since we only want to # expose the methods below and not allow direct access to the # underlying stream. def __init__(self, data): self._stm = io.StringIO(data) def pos(self): return self._stm.tell() def len(self): return len(self._stm.getvalue()) def getvalue(self): return self._stm.getvalue() def skipspaces(self): "post-cond: read pointer will be over first non-WS char" self._skip(lambda c: not c in WS) def _skip(self, stopcond): while True: c = self.peek() if stopcond(c) or c == '': break self.next() def next(self, size=1): return self._stm.read(size) def next_ord(self): return ord(next(self)) def peek(self): if self.pos == self.len: return self.getvalue()[self.pos:] return self.getvalue()[self.pos] def substr(self, pos, length): return self.getvalue()[pos:pos+length]def _decode_utf8(c0, stm): c0 = ord(c0) r = 0xFFFD # unicode replacement character nc = stm.next_ord # 110yyyyy 10zzzzzz if (c0 & 0xE0) == 0xC0: r = ((c0 & 0x1F) << 6) + (nc() & 0x3F) # 1110xxxx 10yyyyyy 10zzzzzz elif (c0 & 0xF0) == 0xE0: r = ((c0 & 0x0F) << 12) + ((nc() & 0x3F) << 6) + (nc() & 0x3F) # 11110www 10xxxxxx 10yyyyyy 10zzzzzz elif (c0 & 0xF8) == 0xF0: r = ((c0 & 0x07) << 18) + ((nc() & 0x3F) << 12) + \ ((nc() & 0x3F) << 6) + (nc() & 0x3F) return chr(r)def decode_escape(c, stm): # whitespace v = ESC_MAP.get(c, None) if v is not None: return v # plain character elif c != 'u': return c # decode unicode escape \u1234 sv = 12 r = 0 for _ in range(0, 4): r |= int(stm.next(), 16) << sv sv -= 4 return chr(r)def _from_json_string(stm): # skip over '"' stm.next() r = '' while True: c = stm.next() if c == '': raise JSONError(E_TRUNC, stm, stm.pos - 1) elif c == '\\': c = stm.next() r += decode_escape(c, stm) elif c == '"': return r elif c in [str(i) for i in range(127, 256)]: r += _decode_utf8(c, stm) else: r += cdef _from_json_fixed(stm, expected, value, errmsg): off = len(expected) pos = stm.pos res = stm.substr(pos, off) if res == expected: stm.next(off) return res raise JSONError(errmsg, stm, pos)def _from_json_number(stm): # Per rfc 4627 section 2.4 '0' and '0.1' are valid, but '01' and # '01.1' are not, presumably since this would be confused with an # octal number. This rule is not enforced. is_float = 0 saw_exp = 0 pos = stm.pos while True: c = stm.peek() if not c: break if not c in NUMCHARS: break elif c == '-' and not saw_exp: pass elif c in '.eE': is_float = 1 if c in 'eE': saw_exp = 1 stm.next() s = stm.substr(pos, stm.pos - pos) if is_float: return s return sdef _from_json_list(stm): # skip over '[' stm.next() result = [] pos = stm.pos comma = False while True: stm.skipspaces() c = stm.peek() if c == '': raise JSONError(E_TRUNC, stm, pos) elif c == ']': stm.next() return result elif c == ',': if not result: raise JSONError(E_TRUNC, stm, pos) if comma: raise JSONError(E_TRUNC, stm, pos) comma = True stm.next() result.append(_from_json_raw(stm)) comma = False continue elif not result: # first item result.append(_from_json_raw(stm)) comma = False continue else: raise JSONError(E_MALF, stm, stm.pos)def _from_json_dict(stm): # skip over '{' stm.next() result = {} expect_key = 1 pos = stm.pos comma = False while True: stm.skipspaces() c = stm.peek() if c == '': raise JSONError(E_TRUNC, stm, pos) # end of dictionary, or next item elif c == '}': if expect_key == 2: raise JSONError(E_TRUNC, stm, pos) stm.next() return result elif c == ',': if not result: raise JSONError(E_TRUNC, stm, pos) if comma: raise JSONError(E_TRUNC, stm, pos) comma = True stm.next() expect_key = 2 continue # parse out a key/value pair elif c == '"': if not expect_key: raise JSONError(E_COMMA, stm, stm.pos) key = _from_json_string(stm) stm.skipspaces() c = stm.next() if c != ':': raise JSONError(E_COLON, stm, stm.pos) stm.skipspaces() val = _from_json_raw(stm) result[key] = val expect_key = 0 comma = False continue # unexpected character in middle of dict raise JSONError(E_MALF, stm, stm.pos)def _from_json_raw(stm): while True: stm.skipspaces() c = stm.peek() if c == '"': return _from_json_string(stm) elif c == '{': return _from_json_dict(stm) elif c == '[': return _from_json_list(stm) elif c == 't': return _from_json_fixed(stm, 'true', True, E_BOOL) elif c == 'f': return _from_json_fixed(stm, 'false', False, E_BOOL) elif c == 'n': return _from_json_fixed(stm, 'null', None, E_NULL) elif c in NUMSTART: return _from_json_number(stm) raise JSONError(E_MALF, stm, stm.pos)def from_json(data): """ Converts 'data' which is UTF-8 (or the 7-bit pure ASCII subset) into a Python representation. You must pass bytes to this in a str type, not unicode. """ if not isinstance(data, str): raise JSONError(E_BYTES) if not data: return None stm = JSONStream(data) v = _from_json_raw(stm) c = stm.peek() if c: raise JSONError(E_BYTES) return v# JSON emitterdef _to_json_list(stm, lst): seen = 0 stm.write('[') for elem in lst: if seen: stm.write(',') seen = 1 _to_json_object(stm, elem) stm.write(']')def _to_json_string(stm, buf): stm.write('"') for c in buf: nc = REV_ESC_MAP.get(c, None) if nc: stm.write('\\' + nc) elif ord(c) <= 0x7F: # force ascii stm.write(str(c)) else: stm.write('\\u%04x' % ord(c)) stm.write('"')def _to_json_dict(stm, dct): seen = 0 stm.write('{') for key in list(dct.keys()): if seen: stm.write(',') seen = 1 val = dct[key] if not type(key) in (bytes, str): key = str(key) _to_json_string(stm, key) stm.write(':') _to_json_object(stm, val) stm.write('}')def _to_json_object(stm, obj): if isinstance(obj, (list, tuple)): _to_json_list(stm, obj) elif isinstance(obj, bool): if obj: stm.write('true') else: stm.write('false') elif isinstance(obj, float): # this raises an error for NaN, -inf and inf values if not (NEG_INF < obj < POS_INF): raise JSONError(E_BADFLOAT % obj) stm.write("%s" % obj) elif isinstance(obj, int): stm.write("%d" % obj) elif isinstance(obj, type(None)): stm.write('null') elif isinstance(obj, (bytes, str)): _to_json_string(stm, obj) elif hasattr(obj, 'keys') and hasattr(obj, '__getitem__'): _to_json_dict(stm, obj) # fall back to implicit string conversion. elif hasattr(obj, '__unicode__'): _to_json_string(stm, obj.__unicode__()) elif hasattr(obj, '__str__'): _to_json_string(stm, obj.__str__()) else: raise JSONError(E_UNSUPP % type(obj))def to_json(obj): """ Converts 'obj' to an ASCII JSON string representation. """ stm = io.StringIO('') _to_json_object(stm, obj) return stm.getvalue()decode = from_jsonencode = to_jsondef main(arg): return from_json(arg)# )]### URLParse.pyThis is the URL parser that is part of the Python distribution. The source was obtained from [here](https://github.com/python/cpython/blob/3.6/Lib/urllib/parse.py).xxxxxxxxxx%%var urlparse_src# [("""Parse (absolute and relative) URLs.urlparse module is based upon the following RFC specifications.RFC 3986 (STD66): "Uniform Resource Identifiers" by T. Berners-Lee, R. Fieldingand L. Masinter, January 2005.RFC 2732 : "Format for Literal IPv6 Addresses in URL's by R.Hinden, B.Carpenterand L.Masinter, December 1999.RFC 2396: "Uniform Resource Identifiers (URI)": Generic Syntax by T.Berners-Lee, R. Fielding, and L. Masinter, August 1998.RFC 2368: "The mailto URL scheme", by P.Hoffman , L Masinter, J. Zawinski, July 1998.RFC 1808: "Relative Uniform Resource Locators", by R. Fielding, UC Irvine, June1995.RFC 1738: "Uniform Resource Locators (URL)" by T. Berners-Lee, L. Masinter, M.McCahill, December 1994RFC 3986 is considered the current standard and any future changes tourlparse module should conform with it. The urlparse module iscurrently not entirely compliant with this RFC due to defactoscenarios for parsing, and for backward compatibility purposes, someparsing quirks from older RFCs are retained. The testcases intest_urlparse.py provides a good indicator of parsing behavior."""import reimport sysimport collections__all__ = ["urlparse", "urlunparse", "urljoin", "urldefrag", "urlsplit", "urlunsplit", "urlencode", "parse_qs", "parse_qsl", "quote", "quote_plus", "quote_from_bytes", "unquote", "unquote_plus", "unquote_to_bytes", "DefragResult", "ParseResult", "SplitResult", "DefragResultBytes", "ParseResultBytes", "SplitResultBytes"]# A classification of schemes.# The empty string classifies URLs with no scheme specified,# being the default value returned by "urlsplit" and "urlparse".uses_relative = ['', 'ftp', 'http', 'gopher', 'nntp', 'imap', 'wais', 'file', 'https', 'shttp', 'mms', 'prospero', 'rtsp', 'rtspu', 'sftp', 'svn', 'svn+ssh', 'ws', 'wss']uses_netloc = ['', 'ftp', 'http', 'gopher', 'nntp', 'telnet', 'imap', 'wais', 'file', 'mms', 'https', 'shttp', 'snews', 'prospero', 'rtsp', 'rtspu', 'rsync', 'svn', 'svn+ssh', 'sftp', 'nfs', 'git', 'git+ssh', 'ws', 'wss']uses_params = ['', 'ftp', 'hdl', 'prospero', 'http', 'imap', 'https', 'shttp', 'rtsp', 'rtspu', 'sip', 'sips', 'mms', 'sftp', 'tel']# These are not actually used anymore, but should stay for backwards# compatibility. (They are undocumented, but have a public-looking name.)non_hierarchical = ['gopher', 'hdl', 'mailto', 'news', 'telnet', 'wais', 'imap', 'snews', 'sip', 'sips']uses_query = ['', 'http', 'wais', 'imap', 'https', 'shttp', 'mms', 'gopher', 'rtsp', 'rtspu', 'sip', 'sips']uses_fragment = ['', 'ftp', 'hdl', 'http', 'gopher', 'news', 'nntp', 'wais', 'https', 'shttp', 'snews', 'file', 'prospero']# Characters valid in scheme namesscheme_chars = ('abcdefghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' '0123456789' '+-.')# XXX: Consider replacing with functools.lru_cacheMAX_CACHE_SIZE = 20_parse_cache = {}def clear_cache(): """Clear the parse cache and the quoters cache.""" _parse_cache.clear() _safe_quoters.clear()# Helpers for bytes handling# For 3.2, we deliberately require applications that# handle improperly quoted URLs to do their own# decoding and encoding. If valid use cases are# presented, we may relax this by using latin-1# decoding internally for 3.3_implicit_encoding = 'ascii'_implicit_errors = 'strict'def _noop(obj): return objdef _encode_result(obj, encoding=_implicit_encoding, errors=_implicit_errors): return obj.encode(encoding, errors)def _decode_args(args, encoding=_implicit_encoding, errors=_implicit_errors): return tuple(x.decode(encoding, errors) if x else '' for x in args)def _coerce_args(*args): # Invokes decode if necessary to create str args # and returns the coerced inputs along with # an appropriate result coercion function # - noop for str inputs # - encoding function otherwise str_input = isinstance(args[0], str) for arg in args[1:]: # We special-case the empty string to support the # "scheme=''" default argument to some functions if arg and isinstance(arg, str) != str_input: raise TypeError("Cannot mix str and non-str arguments") if str_input: return args + (_noop,) return _decode_args(args) + (_encode_result,)# Result objects are more helpful than simple tuplesclass _ResultMixinStr(object): """Standard approach to encoding parsed results from str to bytes""" __slots__ = () def encode(self, encoding='ascii', errors='strict'): return self._encoded_counterpart(*(x.encode(encoding, errors) for x in self))class _ResultMixinBytes(object): """Standard approach to decoding parsed results from bytes to str""" __slots__ = () def decode(self, encoding='ascii', errors='strict'): return self._decoded_counterpart(*(x.decode(encoding, errors) for x in self))class _NetlocResultMixinBase(object): """Shared methods for the parsed result objects containing a netloc element""" __slots__ = () def username(self): return self._userinfo[0] def password(self): return self._userinfo[1] def hostname(self): hostname = self._hostinfo[0] if not hostname: return None # Scoped IPv6 address may have zone info, which must not be lowercased # like http://[fe80::822a:a8ff:fe49:470c%tESt]:1234/keys separator = '%' if isinstance(hostname, str) else b'%' hostname, percent, zone = hostname.partition(separator) return hostname.lower() + percent + zone def port(self): port = self._hostinfo[1] if port is not None: port = int(port, 10) if not ( 0 <= port <= 65535): raise ValueError("Port out of range 0-65535") return portclass _NetlocResultMixinStr(_NetlocResultMixinBase, _ResultMixinStr): __slots__ = () def _userinfo(self): netloc = self.netloc userinfo, have_info, hostinfo = netloc.rpartition('@') if have_info: username, have_password, password = userinfo.partition(':') if not have_password: password = None else: username = password = None return username, password def _hostinfo(self): netloc = self.netloc _, _, hostinfo = netloc.rpartition('@') _, have_open_br, bracketed = hostinfo.partition('[') if have_open_br: hostname, _, port = bracketed.partition(']') _, _, port = port.partition(':') else: hostname, _, port = hostinfo.partition(':') if not port: port = None return hostname, portclass _NetlocResultMixinBytes(_NetlocResultMixinBase, _ResultMixinBytes): __slots__ = () def _userinfo(self): netloc = self.netloc userinfo, have_info, hostinfo = netloc.rpartition(b'@') if have_info: username, have_password, password = userinfo.partition(b':') if not have_password: password = None else: username = password = None return username, password def _hostinfo(self): netloc = self.netloc _, _, hostinfo = netloc.rpartition(b'@') _, have_open_br, bracketed = hostinfo.partition(b'[') if have_open_br: hostname, _, port = bracketed.partition(b']') _, _, port = port.partition(b':') else: hostname, _, port = hostinfo.partition(b':') if not port: port = None return hostname, portfrom collections import namedtuple_DefragResultBase = namedtuple('DefragResult', 'url fragment')_SplitResultBase = namedtuple( 'SplitResult', 'scheme netloc path query fragment')_ParseResultBase = namedtuple( 'ParseResult', 'scheme netloc path params query fragment')_DefragResultBase.__doc__ = """DefragResult(url, fragment)A 2-tuple that contains the url without fragment identifier and the fragmentidentifier as a separate argument."""_DefragResultBase.url.__doc__ = """The URL with no fragment identifier."""_DefragResultBase.fragment.__doc__ = """Fragment identifier separated from URL, that allows indirect identification of asecondary resource by reference to a primary resource and additional identifyinginformation."""_SplitResultBase.__doc__ = """SplitResult(scheme, netloc, path, query, fragment)A 5-tuple that contains the different components of a URL. Similar toParseResult, but does not split params."""_SplitResultBase.scheme.__doc__ = """Specifies URL scheme for the request."""_SplitResultBase.netloc.__doc__ = """Network location where the request is made to."""_SplitResultBase.path.__doc__ = """The hierarchical path, such as the path to a file to download."""_SplitResultBase.query.__doc__ = """The query component, that contains non-hierarchical data, that along with datain path component, identifies a resource in the scope of URI's scheme andnetwork location."""_SplitResultBase.fragment.__doc__ = """Fragment identifier, that allows indirect identification of a secondary resourceby reference to a primary resource and additional identifying information."""_ParseResultBase.__doc__ = """ParseResult(scheme, netloc, path, params, query, fragment)A 6-tuple that contains components of a parsed URL."""_ParseResultBase.scheme.__doc__ = _SplitResultBase.scheme.__doc___ParseResultBase.netloc.__doc__ = _SplitResultBase.netloc.__doc___ParseResultBase.path.__doc__ = _SplitResultBase.path.__doc___ParseResultBase.params.__doc__ = """Parameters for last path element used to dereference the URI in order to provideaccess to perform some operation on the resource."""_ParseResultBase.query.__doc__ = _SplitResultBase.query.__doc___ParseResultBase.fragment.__doc__ = _SplitResultBase.fragment.__doc__# For backwards compatibility, alias _NetlocResultMixinStr# ResultBase is no longer part of the documented API, but it is# retained since deprecating it isn't worth the hassleResultBase = _NetlocResultMixinStr# Structured result objects for string dataclass DefragResult(_DefragResultBase, _ResultMixinStr): __slots__ = () def geturl(self): if self.fragment: return self.url + '#' + self.fragment else: return self.urlclass SplitResult(_SplitResultBase, _NetlocResultMixinStr): __slots__ = () def geturl(self): return urlunsplit(self)class ParseResult(_ParseResultBase, _NetlocResultMixinStr): __slots__ = () def geturl(self): return urlunparse(self)# Structured result objects for bytes dataclass DefragResultBytes(_DefragResultBase, _ResultMixinBytes): __slots__ = () def geturl(self): if self.fragment: return self.url + b'#' + self.fragment else: return self.urlclass SplitResultBytes(_SplitResultBase, _NetlocResultMixinBytes): __slots__ = () def geturl(self): return urlunsplit(self)class ParseResultBytes(_ParseResultBase, _NetlocResultMixinBytes): __slots__ = () def geturl(self): return urlunparse(self)# Set up the encode/decode result pairsdef _fix_result_transcoding(): _result_pairs = ( (DefragResult, DefragResultBytes), (SplitResult, SplitResultBytes), (ParseResult, ParseResultBytes), ) for _decoded, _encoded in _result_pairs: _decoded._encoded_counterpart = _encoded _encoded._decoded_counterpart = _decoded_fix_result_transcoding()del _fix_result_transcodingdef urlparse(url, scheme='', allow_fragments=True): """Parse a URL into 6 components: <scheme>://<netloc>/<path>;<params>?<query>#<fragment> Return a 6-tuple: (scheme, netloc, path, params, query, fragment). Note that we don't break the components up in smaller bits (e.g. netloc is a single string) and we don't expand % escapes.""" url, scheme, _coerce_result = _coerce_args(url, scheme) splitresult = urlsplit(url, scheme, allow_fragments) scheme, netloc, url, query, fragment = splitresult if scheme in uses_params and ';' in url: url, params = _splitparams(url) else: params = '' result = ParseResult(scheme, netloc, url, params, query, fragment) return _coerce_result(result)def _splitparams(url): if '/' in url: i = url.find(';', url.rfind('/')) if i < 0: return url, '' else: i = url.find(';') return url[:i], url[i+1:]def _splitnetloc(url, start=0): delim = len(url) # position of end of domain part of url, default is end for c in '/?#': # look for delimiters; the order is NOT important wdelim = url.find(c, start) # find first of this delim if wdelim >= 0: # if found delim = min(delim, wdelim) # use earliest delim position return url[start:delim], url[delim:] # return (domain, rest)def _checknetloc(netloc): if not netloc or not any(ord(c) > 127 for c in netloc): return # looking for characters like \u2100 that expand to 'a/c' # IDNA uses NFKC equivalence, so normalize for this check import unicodedata n = netloc.rpartition('@')[2] # ignore anything to the left of '@' n = n.replace(':', '') # ignore characters already included n = n.replace('#', '') # but not the surrounding text n = n.replace('?', '') netloc2 = unicodedata.normalize('NFKC', n) if n == netloc2: return for c in '/?#@:': if c in netloc2: raise ValueError("netloc '" + netloc + "' contains invalid " + "characters under NFKC normalization")def urlsplit(url, scheme='', allow_fragments=True): """Parse a URL into 5 components: <scheme>://<netloc>/<path>?<query>#<fragment> Return a 5-tuple: (scheme, netloc, path, query, fragment). Note that we don't break the components up in smaller bits (e.g. netloc is a single string) and we don't expand % escapes.""" url, scheme, _coerce_result = _coerce_args(url, scheme) allow_fragments = bool(allow_fragments) key = url, scheme, allow_fragments, type(url), type(scheme) cached = _parse_cache.get(key, None) if cached: return _coerce_result(cached) if len(_parse_cache) >= MAX_CACHE_SIZE: # avoid runaway growth clear_cache() netloc = query = fragment = '' i = url.find(':') if i > 0: if url[:i] == 'http': # optimize the common case scheme = url[:i].lower() url = url[i+1:] if url[:2] == '//': netloc, url = _splitnetloc(url, 2) if (('[' in netloc and ']' not in netloc) or (']' in netloc and '[' not in netloc)): raise ValueError("Invalid IPv6 URL") if allow_fragments and '#' in url: url, fragment = url.split('#', 1) if '?' in url: url, query = url.split('?', 1) _checknetloc(netloc) v = SplitResult(scheme, netloc, url, query, fragment) _parse_cache[key] = v return _coerce_result(v) for c in url[:i]: if c not in scheme_chars: break else: # make sure "url" is not actually a port number (in which case # "scheme" is really part of the path) rest = url[i+1:] if not rest or any(c not in '0123456789' for c in rest): # not a port number scheme, url = url[:i].lower(), rest if url[:2] == '//': netloc, url = _splitnetloc(url, 2) if (('[' in netloc and ']' not in netloc) or (']' in netloc and '[' not in netloc)): raise ValueError("Invalid IPv6 URL") if allow_fragments and '#' in url: url, fragment = url.split('#', 1) if '?' in url: url, query = url.split('?', 1) _checknetloc(netloc) v = SplitResult(scheme, netloc, url, query, fragment) _parse_cache[key] = v return _coerce_result(v)def urlunparse(components): """Put a parsed URL back together again. This may result in a slightly different, but equivalent URL, if the URL that was parsed originally had redundant delimiters, e.g. a ? with an empty query (the draft states that these are equivalent).""" scheme, netloc, url, params, query, fragment, _coerce_result = ( _coerce_args(*components)) if params: url = "%s;%s" % (url, params) return _coerce_result(urlunsplit((scheme, netloc, url, query, fragment)))def urlunsplit(components): """Combine the elements of a tuple as returned by urlsplit() into a complete URL as a string. The data argument can be any five-item iterable. This may result in a slightly different, but equivalent URL, if the URL that was parsed originally had unnecessary delimiters (for example, a ? with an empty query; the RFC states that these are equivalent).""" scheme, netloc, url, query, fragment, _coerce_result = ( _coerce_args(*components)) if netloc or (scheme and scheme in uses_netloc and url[:2] != '//'): if url and url[:1] != '/': url = '/' + url url = '//' + (netloc or '') + url if scheme: url = scheme + ':' + url if query: url = url + '?' + query if fragment: url = url + '#' + fragment return _coerce_result(url)def urljoin(base, url, allow_fragments=True): """Join a base URL and a possibly relative URL to form an absolute interpretation of the latter.""" if not base: return url if not url: return base base, url, _coerce_result = _coerce_args(base, url) bscheme, bnetloc, bpath, bparams, bquery, bfragment = \ urlparse(base, '', allow_fragments) scheme, netloc, path, params, query, fragment = \ urlparse(url, bscheme, allow_fragments) if scheme != bscheme or scheme not in uses_relative: return _coerce_result(url) if scheme in uses_netloc: if netloc: return _coerce_result(urlunparse((scheme, netloc, path, params, query, fragment))) netloc = bnetloc if not path and not params: path = bpath params = bparams if not query: query = bquery return _coerce_result(urlunparse((scheme, netloc, path, params, query, fragment))) base_parts = bpath.split('/') if base_parts[-1] != '': # the last item is not a directory, so will not be taken into account # in resolving the relative path del base_parts[-1] # for rfc3986, ignore all base path should the first character be root. if path[:1] == '/': segments = path.split('/') else: segments = base_parts + path.split('/') # filter out elements that would cause redundant slashes on re-joining # the resolved_path segments[1:-1] = filter(None, segments[1:-1]) resolved_path = [] for seg in segments: if seg == '..': try: resolved_path.pop() except IndexError: # ignore any .. segments that would otherwise cause an IndexError # when popped from resolved_path if resolving for rfc3986 pass elif seg == '.': continue else: resolved_path.append(seg) if segments[-1] in ('.', '..'): # do some post-processing here. if the last segment was a relative dir, # then we need to append the trailing '/' resolved_path.append('') return _coerce_result(urlunparse((scheme, netloc, '/'.join( resolved_path) or '/', params, query, fragment)))def urldefrag(url): """Removes any existing fragment from URL. Returns a tuple of the defragmented URL and the fragment. If the URL contained no fragments, the second element is the empty string. """ url, _coerce_result = _coerce_args(url) if '#' in url: s, n, p, a, q, frag = urlparse(url) defrag = urlunparse((s, n, p, a, q, '')) else: frag = '' defrag = url return _coerce_result(DefragResult(defrag, frag))_hexdig = '0123456789ABCDEFabcdef'_hextobyte = Nonedef unquote_to_bytes(string): """unquote_to_bytes('abc%20def') -> b'abc def'.""" # Note: strings are encoded as UTF-8. This is only an issue if it contains # unescaped non-ASCII characters, which URIs should not. if not string: # Is it a string-like object? string.split return b'' if isinstance(string, str): string = string.encode('utf-8') bits = string.split(b'%') if len(bits) == 1: return string res = [bits[0]] append = res.append # Delay the initialization of the table to not waste memory # if the function is never called global _hextobyte if _hextobyte is None: _hextobyte = {(a + b).encode(): bytes([int(a + b, 16)]) for a in _hexdig for b in _hexdig} for item in bits[1:]: try: append(_hextobyte[item[:2]]) append(item[2:]) except KeyError: append(b'%') append(item) return b''.join(res)_asciire = re.compile('([\x00-\x7f]+)')def unquote(string, encoding='utf-8', errors='replace'): """Replace %xx escapes by their single-character equivalent. The optional encoding and errors parameters specify how to decode percent-encoded sequences into Unicode characters, as accepted by the bytes.decode() method. By default, percent-encoded sequences are decoded with UTF-8, and invalid sequences are replaced by a placeholder character. unquote('abc%20def') -> 'abc def'. """ if '%' not in string: string.split return string if encoding is None: encoding = 'utf-8' if errors is None: errors = 'replace' bits = _asciire.split(string) res = [bits[0]] append = res.append for i in range(1, len(bits), 2): append(unquote_to_bytes(bits[i]).decode(encoding, errors)) append(bits[i + 1]) return ''.join(res)def parse_qs(qs, keep_blank_values=False, strict_parsing=False, encoding='utf-8', errors='replace', max_num_fields=None): """Parse a query given as a string argument. Arguments: qs: percent-encoded query string to be parsed keep_blank_values: flag indicating whether blank values in percent-encoded queries should be treated as blank strings. A true value indicates that blanks should be retained as blank strings. The default false value indicates that blank values are to be ignored and treated as if they were not included. strict_parsing: flag indicating what to do with parsing errors. If false (the default), errors are silently ignored. If true, errors raise a ValueError exception. encoding and errors: specify how to decode percent-encoded sequences into Unicode characters, as accepted by the bytes.decode() method. max_num_fields: int. If set, then throws a ValueError if there are more than n fields read by parse_qsl(). Returns a dictionary. """ parsed_result = {} pairs = parse_qsl(qs, keep_blank_values, strict_parsing, encoding=encoding, errors=errors, max_num_fields=max_num_fields) for name, value in pairs: if name in parsed_result: parsed_result[name].append(value) else: parsed_result[name] = [value] return parsed_resultdef parse_qsl(qs, keep_blank_values=False, strict_parsing=False, encoding='utf-8', errors='replace', max_num_fields=None): """Parse a query given as a string argument. Arguments: qs: percent-encoded query string to be parsed keep_blank_values: flag indicating whether blank values in percent-encoded queries should be treated as blank strings. A true value indicates that blanks should be retained as blank strings. The default false value indicates that blank values are to be ignored and treated as if they were not included. strict_parsing: flag indicating what to do with parsing errors. If false (the default), errors are silently ignored. If true, errors raise a ValueError exception. encoding and errors: specify how to decode percent-encoded sequences into Unicode characters, as accepted by the bytes.decode() method. max_num_fields: int. If set, then throws a ValueError if there are more than n fields read by parse_qsl(). Returns a list, as G-d intended. """ qs, _coerce_result = _coerce_args(qs) # If max_num_fields is defined then check that the number of fields # is less than max_num_fields. This prevents a memory exhaustion DOS # attack via post bodies with many fields. if max_num_fields is not None: num_fields = 1 + qs.count('&') + qs.count(';') if max_num_fields < num_fields: raise ValueError('Max number of fields exceeded') pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] r = [] for name_value in pairs: if not name_value and not strict_parsing: continue nv = name_value.split('=', 1) if len(nv) != 2: if strict_parsing: raise ValueError("bad query field: %r" % (name_value,)) # Handle case of a control-name with no equal sign if keep_blank_values: nv.append('') else: continue if len(nv[1]) or keep_blank_values: name = nv[0].replace('+', ' ') name = unquote(name, encoding=encoding, errors=errors) name = _coerce_result(name) value = nv[1].replace('+', ' ') value = unquote(value, encoding=encoding, errors=errors) value = _coerce_result(value) r.append((name, value)) return rdef unquote_plus(string, encoding='utf-8', errors='replace'): """Like unquote(), but also replace plus signs by spaces, as required for unquoting HTML form values. unquote_plus('%7e/abc+def') -> '~/abc def' """ string = string.replace('+', ' ') return unquote(string, encoding, errors)_ALWAYS_SAFE = frozenset(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ' b'abcdefghijklmnopqrstuvwxyz' b'0123456789' b'_.-')_ALWAYS_SAFE_BYTES = bytes(_ALWAYS_SAFE)_safe_quoters = {}class Quoter(collections.defaultdict): """A mapping from bytes (in range(0,256)) to strings. String values are percent-encoded byte values, unless the key < 128, and in the "safe" set (either the specified safe set, or default set). """ # Keeps a cache internally, using defaultdict, for efficiency (lookups # of cached keys don't call Python code at all). def __init__(self, safe): """safe: bytes object.""" self.safe = _ALWAYS_SAFE.union(safe) def __repr__(self): # Without this, will just display as a defaultdict return "<%s %r>" % (self.__class__.__name__, dict(self)) def __missing__(self, b): # Handle a cache miss. Store quoted string in cache and return. res = chr(b) if b in self.safe else '%{:02X}'.format(b) self[b] = res return resdef quote(string, safe='/', encoding=None, errors=None): """quote('abc def') -> 'abc%20def' Each part of a URL, e.g. the path info, the query, etc., has a different set of reserved characters that must be quoted. RFC 2396 Uniform Resource Identifiers (URI): Generic Syntax lists the following reserved characters. reserved = ";" | "/" | "?" | ":" | "@" | "&" | "=" | "+" | "$" | "," Each of these characters is reserved in some component of a URL, but not necessarily in all of them. By default, the quote function is intended for quoting the path section of a URL. Thus, it will not encode '/'. This character is reserved, but in typical usage the quote function is being called on a path where the existing slash characters are used as reserved characters. string and safe may be either str or bytes objects. encoding and errors must not be specified if string is a bytes object. The optional encoding and errors parameters specify how to deal with non-ASCII characters, as accepted by the str.encode method. By default, encoding='utf-8' (characters are encoded with UTF-8), and errors='strict' (unsupported characters raise a UnicodeEncodeError). """ if isinstance(string, str): if not string: return string if encoding is None: encoding = 'utf-8' if errors is None: errors = 'strict' string = string.encode(encoding, errors) else: if encoding is not None: raise TypeError("quote() doesn't support 'encoding' for bytes") if errors is not None: raise TypeError("quote() doesn't support 'errors' for bytes") return quote_from_bytes(string, safe)def quote_plus(string, safe='', encoding=None, errors=None): """Like quote(), but also replace ' ' with '+', as required for quoting HTML form values. Plus signs in the original string are escaped unless they are included in safe. It also does not have safe default to '/'. """ # Check if ' ' in string, where string may either be a str or bytes. If # there are no spaces, the regular quote will produce the right answer. if ((isinstance(string, str) and ' ' not in string) or (isinstance(string, bytes) and b' ' not in string)): return quote(string, safe, encoding, errors) if isinstance(safe, str): space = ' ' else: space = b' ' string = quote(string, safe + space, encoding, errors) return string.replace(' ', '+')def quote_from_bytes(bs, safe='/'): """Like quote(), but accepts a bytes object rather than a str, and does not perform string-to-bytes encoding. It always returns an ASCII string. quote_from_bytes(b'abc def\x3f') -> 'abc%20def%3f' """ if not isinstance(bs, (bytes, bytearray)): raise TypeError("quote_from_bytes() expected bytes") if not bs: return '' if isinstance(safe, str): # Normalize 'safe' by converting to bytes and removing non-ASCII chars safe = safe.encode('ascii', 'ignore') else: safe = bytes([c for c in safe if c < 128]) if not bs.rstrip(_ALWAYS_SAFE_BYTES + safe): return bs.decode() try: quoter = _safe_quoters[safe] except KeyError: _safe_quoters[safe] = quoter = Quoter(safe).__getitem__ return ''.join([quoter(char) for char in bs])def urlencode(query, doseq=False, safe='', encoding=None, errors=None, quote_via=quote_plus): """Encode a dict or sequence of two-element tuples into a URL query string. If any values in the query arg are sequences and doseq is true, each sequence element is converted to a separate parameter. If the query arg is a sequence of two-element tuples, the order of the parameters in the output will match the order of parameters in the input. The components of a query arg may each be either a string or a bytes type. The safe, encoding, and errors parameters are passed down to the function specified by quote_via (encoding and errors only if a component is a str). """ if hasattr(query, "items"): query = query.items() else: # It's a bother at times that strings and string-like objects are # sequences. try: # non-sequence items should not work with len() # non-empty strings will fail this if len(query) and not isinstance(query[0], tuple): raise TypeError # Zero-length sequences of all types will get here and succeed, # but that's a minor nit. Since the original implementation # allowed empty dicts that type of behavior probably should be # preserved for consistency except TypeError: ty, va, tb = sys.exc_info() raise TypeError("not a valid non-string sequence " "or mapping object").with_traceback(tb) l = [] if not doseq: for k, v in query: if isinstance(k, bytes): k = quote_via(k, safe) else: k = quote_via(str(k), safe, encoding, errors) if isinstance(v, bytes): v = quote_via(v, safe) else: v = quote_via(str(v), safe, encoding, errors) l.append(k + '=' + v) else: for k, v in query: if isinstance(k, bytes): k = quote_via(k, safe) else: k = quote_via(str(k), safe, encoding, errors) if isinstance(v, bytes): v = quote_via(v, safe) l.append(k + '=' + v) elif isinstance(v, str): v = quote_via(v, safe, encoding, errors) l.append(k + '=' + v) else: try: # Is this a sufficient test for sequence-ness? x = len(v) except TypeError: # not a sequence v = quote_via(str(v), safe, encoding, errors) l.append(k + '=' + v) else: # loop over the sequence for elt in v: if isinstance(elt, bytes): elt = quote_via(elt, safe) else: elt = quote_via(str(elt), safe, encoding, errors) l.append(k + '=' + elt) return '&'.join(l)def to_bytes(url): """to_bytes(u"URL") --> 'URL'.""" # Most URL schemes require ASCII. If that changes, the conversion # can be relaxed. # XXX get rid of to_bytes() if isinstance(url, str): try: url = url.encode("ASCII").decode() except UnicodeError: raise UnicodeError("URL " + repr(url) + " contains non-ASCII characters") return urldef unwrap(url): """unwrap('<URL:type://host/path>') --> 'type://host/path'.""" url = str(url).strip() if url[:1] == '<' and url[-1:] == '>': url = url[1:-1].strip() if url[:4] == 'URL:': url = url[4:].strip() return url_typeprog = Nonedef splittype(url): """splittype('type:opaquestring') --> 'type', 'opaquestring'.""" global _typeprog if _typeprog is None: _typeprog = re.compile('([^/:]+):(.*)', re.DOTALL) match = _typeprog.match(url) if match: scheme, data = match.groups() return scheme.lower(), data return None, url_hostprog = Nonedef splithost(url): """splithost('//host[:port]/path') --> 'host[:port]', '/path'.""" global _hostprog if _hostprog is None: _hostprog = re.compile('//([^/#?]*)(.*)', re.DOTALL) match = _hostprog.match(url) if match: host_port, path = match.groups() if path and path[0] != '/': path = '/' + path return host_port, path return None, urldef splituser(host): """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.""" user, delim, host = host.rpartition('@') return (user if delim else None), hostdef splitpasswd(user): """splitpasswd('user:passwd') -> 'user', 'passwd'.""" user, delim, passwd = user.partition(':') return user, (passwd if delim else None)# splittag('/path#tag') --> '/path', 'tag'_portprog = Nonedef splitport(host): """splitport('host:port') --> 'host', 'port'.""" global _portprog if _portprog is None: _portprog = re.compile('(.*):([0-9]*)$', re.DOTALL) match = _portprog.match(host) if match: host, port = match.groups() if port: return host, port return host, Nonedef splitnport(host, defport=-1): """Split host and port, returning numeric port. Return given default port if no ':' found; defaults to -1. Return numerical port if a valid number are found after ':'. Return None if ':' but not a valid number.""" host, delim, port = host.rpartition(':') if not delim: host = port elif port: try: nport = int(port) except ValueError: nport = None return host, nport return host, defportdef splitquery(url): """splitquery('/path?query') --> '/path', 'query'.""" path, delim, query = url.rpartition('?') if delim: return path, query return url, Nonedef splittag(url): """splittag('/path#tag') --> '/path', 'tag'.""" path, delim, tag = url.rpartition('#') if delim: return path, tag return url, Nonedef splitattr(url): """splitattr('/path;attr1=value1;attr2=value2;...') -> '/path', ['attr1=value1', 'attr2=value2', ...].""" words = url.split(';') return words[0], words[1:]def splitvalue(attr): """splitvalue('attr=value') --> 'attr', 'value'.""" attr, delim, value = attr.partition('=') return attr, (value if delim else None)def main(arg): clear_cache() urlparse(arg)# )]### Netrc.pyNetrc is the initialization file that is read by web-agents such as CURL. Python ships the netrc library in its standard distribution. This was taken from [here](https://github.com/python/cpython/blob/3.6/Lib/netrc.py). Note that we use `mylex` and `myio` which corresponds to `shlex` and `io` from Python distribution, but instrumented to preserve taints.Netrc is the initialization file that is read by web-agents such as CURL. Python ships the netrc library in its standard distribution. This was taken from here. Note that we use mylex and myio which corresponds to shlex and io from Python distribution, but instrumented to preserve taints.
xxxxxxxxxx%%var netrc_src# [("""An object-oriented interface to .netrc files."""# Module and documentation by Eric S. Raymond, 21 Dec 1998import os, mylex, statimport myio as io__all__ = ["netrc", "NetrcParseError"]class NetrcParseError(Exception): """Exception raised on syntax errors in the .netrc file.""" def __init__(self, msg, filename=None, lineno=None): self.filename = filename self.lineno = lineno self.msg = msg Exception.__init__(self, msg) def __str__(self): return "%s (%s, line %s)" % (self.msg, self.filename, self.lineno)class netrc: def __init__(self, arg): default_netrc = False self.hosts = {} self.macros = {} self._parse('netrc', arg, default_netrc) def _parse(self, file, fp_, default_netrc): fp = io.StringIO(fp_) lexer = mylex.shlex(fp) lexer.wordchars += r"""!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~""" lexer.commenters = lexer.commenters.replace('#', '') while 1: # Look for a machine, default, or macdef top-level keyword saved_lineno = lexer.lineno toplevel = tt = lexer.get_token() if not tt: break elif tt[0] == '#': if lexer.lineno == saved_lineno and len(tt) == 1: lexer.instream.readline() continue elif tt == 'machine': entryname = lexer.get_token() elif tt == 'default': entryname = 'default' elif tt == 'macdef': # Just skip to end of macdefs entryname = lexer.get_token() self.macros[entryname] = [] lexer.whitespace = ' \t' while 1: line = lexer.instream.readline() if not line or line == '\012': lexer.whitespace = ' \t\r\n' break self.macros[entryname].append(line) continue else: raise NetrcParseError( "bad toplevel token %r" % tt, file, lexer.lineno) # We're looking at start of an entry for a named machine or default. login = '' account = password = None self.hosts[entryname] = {} while 1: tt = lexer.get_token() if (tt.startswith('#') or tt in {'', 'machine', 'default', 'macdef'}): if password: self.hosts[entryname] = (login, account, password) lexer.push_token(tt) break else: raise NetrcParseError( "malformed %s entry %s terminated by %s" % (toplevel, entryname, repr(tt)), file, lexer.lineno) elif tt == 'login' or tt == 'user': login = lexer.get_token() elif tt == 'account': account = lexer.get_token() elif tt == 'password': if os.name == 'posix' and default_netrc: prop = os.fstat(fp.fileno()) if prop.st_uid != os.getuid(): import pwd try: fowner = pwd.getpwuid(prop.st_uid)[0] except KeyError: fowner = 'uid %s' % prop.st_uid try: user = pwd.getpwuid(os.getuid())[0] except KeyError: user = 'uid %s' % os.getuid() raise NetrcParseError( ("~/.netrc file owner (%s) does not match" " current user (%s)") % (fowner, user), file, lexer.lineno) if (prop.st_mode & (stat.S_IRWXG | stat.S_IRWXO)): raise NetrcParseError( "~/.netrc access too permissive: access" " permissions must restrict access to only" " the owner", file, lexer.lineno) password = lexer.get_token() else: raise NetrcParseError("bad follower token %r" % tt, file, lexer.lineno) def authenticators(self, host): """Return a (user, account, password) tuple for given host.""" if host in self.hosts: return self.hosts[host] elif 'default' in self.hosts: return self.hosts['default'] else: return None def __repr__(self): """Dump the class data in the format of a .netrc file.""" rep = "" for host in self.hosts.keys(): attrs = self.hosts[host] rep += f"machine {host}\n\tlogin {attrs[0]}\n" if attrs[1]: rep += f"\taccount {attrs[1]}\n" rep += f"\tpassword {attrs[2]}\n" for macro in self.macros.keys(): rep += f"macdef {macro}\n" for line in self.macros[macro]: rep += line rep += "\n" return repdef main(arg): netrc(arg)# )]### CGIDecode.pyThe CGIDecode is a program to decode a URL encoded string. The source for this program was obtained from [here](https://www.fuzzingbook.org/html/Coverage.html).xxxxxxxxxx%%var cgidecode_src# [(def cgi_decode(s): """Decode the CGI-encoded string `s`: * replace "+" by " " * replace "%xx" by the character with hex number xx. Return the decoded string. Raise `ValueError` for invalid inputs.""" # Mapping of hex digits to their integer values hex_values = { '0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, 'a': 10, 'b': 11, 'c': 12, 'd': 13, 'e': 14, 'f': 15, 'A': 10, 'B': 11, 'C': 12, 'D': 13, 'E': 14, 'F': 15, } t = "" i = 0 while i < len(s): c = s[i] if c == '+': t += ' ' elif c == '%': digit_high, digit_low = s[i + 1], s[i + 2] i += 2 if digit_high in hex_values and digit_low in hex_values: v = hex_values[digit_high] * 16 + hex_values[digit_low] t += chr(v) else: raise ValueError("Invalid encoding") else: t += c i += 1 return tdef main(arg): cgi_decode(arg)# )]### Subject RegistryWe store all our subject programs under `program_src`.We store all our subject programs under program_src.
xxxxxxxxxx# [(program_src = { 'calculator.py': VARS['calc_src'], 'mathexpr.py': VARS['mathexpr_src'], 'urlparse.py': VARS['urlparse_src'], 'netrc.py': VARS['netrc_src'], 'cgidecode.py': VARS['cgidecode_src'], 'microjson.py': VARS['microjson_src']}# )]## Rewriting the source to track control flow and taints.We rewrite the source so that `asring in value` gets converted to `taint_wrap__(astring).in_(value)`. Note that what we are tracking is not really taints, but rather _character accesses_ to the origin string.We rewrite the source so that asring in value gets converted to taint_wrap__(astring).in_(value). Note that what we are tracking is not really taints, but rather character accesses to the origin string.
We also rewrite the methods so that method bodies are enclosed in a `method__` context manager, any `if`conditions and `while` loops (only `while` for now) are enclosed in an outer `stack__` and inner `scope__` context manager. This lets us track when the corresponding scopes are entered into and left.We also rewrite the methods so that method bodies are enclosed in a method__ context manager, any ifconditions and while loops (only while for now) are enclosed in an outer stack__ and inner scope__ context manager. This lets us track when the corresponding scopes are entered into and left.
xxxxxxxxxximport astimport astor### InRewriterThe `InRewriter` class handles transforming `in` statements so that taints can be tracked. It has two methods. The `wrap()` method transforms any `a in lst` calls to `taint_wrap__(a) in lst`.The InRewriter class handles transforming in statements so that taints can be tracked. It has two methods. The wrap() method transforms any a in lst calls to taint_wrap__(a) in lst.
xxxxxxxxxxclass InRewriter(ast.NodeTransformer): def wrap(self, node): return ast.Call(func=ast.Name(id='taint_wrap__', ctx=ast.Load()), args=[node], keywords=[])The `wrap()` method is internally used by `visit_Compare()` method to transform `a in lst` to `taint_wrap__(a).in_(lst)`. We need to do this because Python ties the overriding of `in` operator to the `__contains__()` method in the class of `lst`. In our case, however, very often `a` is the element tainted and hence proxied. Hence we need a method invoked on the `a` object.The wrap() method is internally used by visit_Compare() method to transform a in lst to taint_wrap__(a).in_(lst). We need to do this because Python ties the overriding of in operator to the __contains__() method in the class of lst. In our case, however, very often a is the element tainted and hence proxied. Hence we need a method invoked on the a object.
xxxxxxxxxxclass InRewriter(InRewriter): def visit_Compare(self, tree_node): left = tree_node.left if not tree_node.ops or not isinstance(tree_node.ops[0], ast.In): return tree_node mod_val = ast.Call( func=ast.Attribute( value=self.wrap(left), attr='in_'), args=tree_node.comparators, keywords=[]) return mod_valTying it together.Tying it together.
xxxxxxxxxxdef rewrite_in(src): v = ast.fix_missing_locations(InRewriter().visit(ast.parse(src))) source = astor.to_source(v) return "%s" % sourcexxxxxxxxxxfrom fuzzingbook.fuzzingbook_utils import print_contentxxxxxxxxxx%top print_content(rewrite_in('s in ["a", "b", "c"]'))### RewriterThe `Rewriter` class handles inserting tracing probes into methods and control structures. Essentially, we insert a `with` scope for the method body, and a `with` scope outside both `while` and `if` scopes. Finally, we insert a `with` scope inside the `while` and `if` scopes. IMPORTANT: We only implement the `while` context. Similar should be implemented for the `for` context.The Rewriter class handles inserting tracing probes into methods and control structures. Essentially, we insert a with scope for the method body, and a with scope outside both while and if scopes. Finally, we insert a with scope inside the while and if scopes. IMPORTANT: We only implement the while context. Similar should be implemented for the for context.
A few counters to provide unique identifiers for context managers. Essentially, we number each if and while that we see.A few counters to provide unique identifiers for context managers. Essentially, we number each if and while that we see.
xxxxxxxxxxclass Rewriter(InRewriter): def init_counters(self): self.if_counter = 0 self.while_counter = 0The `methods[]` array is used to keep track of the current method stack during execution. Epsilon and NoEpsilon are simply constants that I use to indicate whether an IF or a Loop is nullable or not. If it is nullable, I mark it with Epsilon.The methods[] array is used to keep track of the current method stack during execution. Epsilon and NoEpsilon are simply constants that I use to indicate whether an IF or a Loop is nullable or not. If it is nullable, I mark it with Epsilon.
xxxxxxxxxxmethods = []Epsilon = '-'NoEpsilon = '='The `wrap_in_method()` generates a wrapper for method definitions.The wrap_in_method() generates a wrapper for method definitions.
xxxxxxxxxxclass Rewriter(Rewriter): def wrap_in_method(self, body, args): method_name_expr = ast.Str(methods[-1]) my_args = ast.List(args.args, ast.Load()) args = [method_name_expr, my_args] scope_expr = ast.Call(func=ast.Name(id='method__', ctx=ast.Load()), args=args, keywords=[]) return [ast.With(items=[ast.withitem(scope_expr, ast.Name(id='_method__'))], body=body)]The method `visit_FunctionDef()` is the method rewriter that actually does the job.The method visit_FunctionDef() is the method rewriter that actually does the job.
xxxxxxxxxxclass Rewriter(Rewriter): def visit_FunctionDef(self, tree_node): self.init_counters() methods.append(tree_node.name) self.generic_visit(tree_node) tree_node.body = self.wrap_in_method(tree_node.body, tree_node.args) return tree_nodeThe `rewrite_def()` method wraps the function definitions in scopes.The rewrite_def() method wraps the function definitions in scopes.
xxxxxxxxxxdef rewrite_def(src): v = ast.fix_missing_locations(Rewriter().visit(ast.parse(src))) return astor.to_source(v)We can use it as follows:We can use it as follows:
xxxxxxxxxx%top print_content(rewrite_def('\n'.join(program_src['calculator.py'].split('\n')[12:19])), 'calculator.py')#### The stack wrapperThe method `wrap_in_outer()` adds a `with ..stack..()` context _outside_ the control structures. The stack is used to keep track of the current control structure stack for any character comparison made. Notice the `can_empty` parameter. This indicates that the particular structure is _nullable_. For `if` we can make the condition right away. For `while` we postpone the decision.The method wrap_in_outer() adds a with ..stack..() context outside the control structures. The stack is used to keep track of the current control structure stack for any character comparison made. Notice the can_empty parameter. This indicates that the particular structure is nullable. For if we can make the condition right away. For while we postpone the decision.
xxxxxxxxxxclass Rewriter(Rewriter): def wrap_in_outer(self, name, can_empty, counter, node): name_expr = ast.Str(name) can_empty_expr = ast.Str(can_empty) counter_expr = ast.Num(counter) method_id = ast.Name(id='_method__') args = [name_expr, counter_expr, method_id, can_empty_expr] scope_expr = ast.Call(func=ast.Name(id='stack__', ctx=ast.Load()), args=args, keywords=[]) return ast.With( items=[ast.withitem(scope_expr, ast.Name(id='%s_%d_stack__' % (name, counter)))], body=[node])#### The scope wrapperThe method `wrap_in_inner()` adds a `with ...scope..()` context immediately inside the control structure. For `while`, this means simply adding one `with ...scope..()` just before the first line. For `if`, this means adding one `with ...scope...()` each to each branch of the `if` condition.The method wrap_in_inner() adds a with ...scope..() context immediately inside the control structure. For while, this means simply adding one with ...scope..() just before the first line. For if, this means adding one with ...scope...() each to each branch of the if condition.
xxxxxxxxxxclass Rewriter(Rewriter): def wrap_in_inner(self, name, counter, val, body): val_expr = ast.Num(val) stack_iter = ast.Name(id='%s_%d_stack__' % (name, counter)) method_id = ast.Name(id='_method__') args = [val_expr, stack_iter, method_id] scope_expr = ast.Call(func=ast.Name(id='scope__', ctx=ast.Load()), args=args, keywords=[]) return [ast.With( items=[ast.withitem(scope_expr, ast.Name(id='%s_%d_%d_scope__' % (name, counter, val)))], body=body)]#### Rewriting `If` conditionsWhile rewriting if conditions, we have to take care of the cascading if conditions (`elsif`), which is represented as nested if conditions in AST. They do not require separate `stack` context, but only separate `scope` contexts.If conditions¶While rewriting if conditions, we have to take care of the cascading if conditions (elsif), which is represented as nested if conditions in AST. They do not require separate stack context, but only separate scope contexts.
xxxxxxxxxxclass Rewriter(Rewriter): def process_if(self, tree_node, counter, val=None): if val is None: val = 0 else: val += 1 if_body = [] self.generic_visit(tree_node.test) for node in tree_node.body: self.generic_visit(node) tree_node.body = self.wrap_in_inner('if', counter, val, tree_node.body) # else part. if len(tree_node.orelse) == 1 and isinstance(tree_node.orelse[0], ast.If): self.process_if(tree_node.orelse[0], counter, val) else: if tree_node.orelse: val += 1 for node in tree_node.orelse: self.generic_visit(node) tree_node.orelse = self.wrap_in_inner('if', counter, val, tree_node.orelse)While rewriting `if` conditions, we have to take care of the cascading `if` conditions, which is represented as nested `if` conditions in AST. We need to identify whether the cascading `if` conditions (`elsif`) have an empty `orelse` clause or not. If it has an empty `orelse`, then the entire set of `if` conditions may be excised, and still produce a valid value. Hence, it should be marked as optional. The `visit_If()` checks if the cascading `ifs` have an `orelse` or not. While rewriting if conditions, we have to take care of the cascading if conditions, which is represented as nested if conditions in AST. We need to identify whether the cascading if conditions (elsif) have an empty orelse clause or not. If it has an empty orelse, then the entire set of if conditions may be excised, and still produce a valid value. Hence, it should be marked as optional. The visit_If() checks if the cascading ifs have an orelse or not.
xxxxxxxxxxclass Rewriter(Rewriter): def visit_If(self, tree_node): self.if_counter += 1 counter = self.if_counter #is it empty start = tree_node while start: if isinstance(start, ast.If): if not start.orelse: start = None elif len(start.orelse) == 1: start = start.orelse[0] else: break else: break self.process_if(tree_node, counter=self.if_counter) can_empty = NoEpsilon if start else Epsilon # NoEpsilon for + and Epsilon for * return self.wrap_in_outer('if', can_empty, counter, tree_node)#### Rewriting `while` loopsRewriting while loops are simple. We wrap them in `stack` and `scope` contexts. We do not implement the `orelse` feature yet.while loops¶Rewriting while loops are simple. We wrap them in stack and scope contexts. We do not implement the orelse feature yet.
xxxxxxxxxxclass Rewriter(Rewriter): def visit_While(self, tree_node): self.generic_visit(tree_node) self.while_counter += 1 counter = self.while_counter test = tree_node.test body = tree_node.body assert not tree_node.orelse tree_node.body = self.wrap_in_inner('while', counter, 0, body) return self.wrap_in_outer('while', '?', counter, tree_node)xxxxxxxxxxdef rewrite_cf(src, original): v = ast.fix_missing_locations(Rewriter().visit(ast.parse(src))) return astor.to_source(v)An example with `if` conditions.An example with if conditions.
xxxxxxxxxx%top print_content('\n'.join(program_src['calculator.py'].split('\n')[12:19]), 'calculator.py')xxxxxxxxxx%top print_content(rewrite_cf('\n'.join(program_src['calculator.py'].split('\n')[12:19]), 'calculator.py').strip(), filename='calculator.py')An example with `while` loops.An example with while loops.
xxxxxxxxxx%top print_content('\n'.join(program_src['calculator.py'].split('\n')[5:11]), 'calculator.py')xxxxxxxxxx%top print_content(rewrite_cf('\n'.join(program_src['calculator.py'].split('\n')[5:11]), 'calculator.py'), filename='calculator.py')#### Generating the complete instrumented sourceFor the complete instrumented source, we need to first make sure that all necessary imports are satisfied. Next, we also need to invoke the parser with the necessary tainted input and output the trace.For the complete instrumented source, we need to first make sure that all necessary imports are satisfied. Next, we also need to invoke the parser with the necessary tainted input and output the trace.
xxxxxxxxxxdef rewrite(src, original): src = ast.fix_missing_locations(InRewriter().visit(ast.parse(src))) v = ast.fix_missing_locations(Rewriter().visit(ast.parse(src))) header = """from mimid_context import scope__, stack__, method__import jsonimport sysimport taintsfrom taints import taint_wrap__ """ source = astor.to_source(v) footer = """if __name__ == "__main__": js = [] for arg in sys.argv[1:]: with open(arg) as f: mystring = f.read().strip().replace('\\n', ' ') taints.trace_init() tainted_input = taints.wrap_input(mystring) main(tainted_input) assert tainted_input.comparisons j = { 'comparisons_fmt': 'idx, char, method_call_id', 'comparisons':taints.convert_comparisons(tainted_input.comparisons, mystring), 'method_map_fmt': 'method_call_id, method_name, children', 'method_map': taints.convert_method_map(taints.METHOD_MAP), 'inputstr': mystring, 'original': %s, 'arg': arg} js.append(j) print(json.dumps(js))""" footer = footer % repr(original) return "%s\n%s\n%s" % (header, source, footer)xxxxxxxxxx%top calc_parse_rewritten = rewrite(program_src['calculator.py'], original='calculator.py')xxxxxxxxxx%top print_content(calc_parse_rewritten, filename='calculator.py')We will now write the transformed sources.We will now write the transformed sources.
xxxxxxxxxxdo(['mkdir','-p','build','subjects','samples']).returncodexxxxxxxxxx# [(for file_name in program_src: print(file_name) with open("subjects/%s" % file_name, 'wb+') as f: f.write(program_src[file_name].encode('utf-8')) with open("build/%s" % file_name, 'w+') as f: f.write(rewrite(program_src[file_name], file_name))# )]### Context MangersThe context managers are probes inserted into the source code so that we know when execution enters and exits specific control flow structures such as conditionals and loops. Note that source code for these probes are not really a requirement. They can be inserted directly on binaries too, or even dynamically inserted using tools such as `dtrace`. For now, we make our life simple using AST editing.The context managers are probes inserted into the source code so that we know when execution enters and exits specific control flow structures such as conditionals and loops. Note that source code for these probes are not really a requirement. They can be inserted directly on binaries too, or even dynamically inserted using tools such as dtrace. For now, we make our life simple using AST editing.
#### Method contextThe `method__` context handles the assignment of method name, as well as storing the method stack.The method__ context handles the assignment of method name, as well as storing the method stack.
xxxxxxxxxx%%var mimid_method_context# [(import taintsimport urllib.parsedef to_key(method, name, num): return '%s:%s_%s' % (method, name, num)class method__: def __init__(self, name, args): if not taints.METHOD_NUM_STACK: return self.args = '_'.join([urllib.parse.quote(i) for i in args if type(i) == str]) if not self.args: self.name = name else: self.name = "%s__%s" % (name, self.args) # <- not for now #TODO if args and hasattr(args[0], 'tag'): self.name = "%s:%s" % (args[0].tag, self.name) taints.trace_call(self.name) def __enter__(self): if not taints.METHOD_NUM_STACK: return taints.trace_set_method(self.name) self.stack = [] return self def __exit__(self, *args): if not taints.METHOD_NUM_STACK: return taints.trace_return() taints.trace_set_method(self.name)# )]The stack context stores the current prefix and handles updating the stack that is stored at the method context. The stack context stores the current prefix and handles updating the stack that is stored at the method context.
xxxxxxxxxx%%var mimid_stack_context# [(class stack__: def __init__(self, name, num, method_i, can_empty): if not taints.METHOD_NUM_STACK: return self.method_stack = method_i.stack self.can_empty = can_empty # * means yes. + means no, ? means to be determined self.name, self.num, self.method = name, num, method_i.name self.prefix = to_key(self.method, self.name, self.num) def __enter__(self): if not taints.METHOD_NUM_STACK: return if self.name in {'while'}: self.method_stack.append(0) elif self.name in {'if'}: self.method_stack.append(-1) else: assert False return self def __exit__(self, *args): if not taints.METHOD_NUM_STACK: return self.method_stack.pop()# )]#### Scope contextThe scope context correctly identifies when the control structure is entered into, and exited (in case of loops) and the alternative entered int (in case of if conditions).The scope context correctly identifies when the control structure is entered into, and exited (in case of loops) and the alternative entered int (in case of if conditions).
xxxxxxxxxx%%var mimid_scope_context# [(import jsonclass scope__: def __init__(self, alt, stack_i, method_i): if not taints.METHOD_NUM_STACK: return self.name, self.num, self.method, self.alt = stack_i.name, stack_i.num, stack_i.method, alt self.method_stack = method_i.stack self.can_empty = stack_i.can_empty def __enter__(self): if not taints.METHOD_NUM_STACK: return if self.name in {'while'}: self.method_stack[-1] += 1 elif self.name in {'if'}: pass else: assert False, self.name uid = json.dumps(self.method_stack) if self.name in {'while'}: taints.trace_call('%s:%s_%s %s %s' % (self.method, self.name, self.num, self.can_empty, uid)) else: taints.trace_call('%s:%s_%s %s %s#%s' % (self.method, self.name, self.num, self.can_empty, self.alt, uid)) taints.trace_set_method(self.name) return self def __exit__(self, *args): if not taints.METHOD_NUM_STACK: return taints.trace_return() taints.trace_set_method(self.name)# )]### Taint TrackerThe taint tracker is essentially a reimplementation of the information flow taints from the Fuzzingbook. It incorporates tracing of character accesses. IMPORTANT: Not all methods are implemented.The taint tracker is essentially a reimplementation of the information flow taints from the Fuzzingbook. It incorporates tracing of character accesses. IMPORTANT: Not all methods are implemented.
xxxxxxxxxx%%var taints_src# [(import inspectimport enumclass tstr_(str): def __new__(cls, value, *args, **kw): return super(tstr_, cls).__new__(cls, value)class tstr(tstr_): def __init__(self, value, taint=None, parent=None, **kwargs): self.parent = parent l = len(self) if taint is None: taint = 0 self.taint = list(range(taint, taint + l)) if isinstance( taint, int) else taint assert len(self.taint) == l def __repr__(self): return self def __str__(self): return str.__str__(self)class tstr(tstr): def untaint(self): self.taint = [None] * len(self) return self def has_taint(self): return any(True for i in self.taint if i is not None) def taint_in(self, gsentence): return set(self.taint) <= set(gsentence.taint)class tstr(tstr): def create(self, res, taint): return tstr(res, taint, self)class tstr(tstr): def __getitem__(self, key): res = super().__getitem__(key) if isinstance(key, int): key = len(self) + key if key < 0 else key return self.create(res, [self.taint[key]]) elif isinstance(key, slice): return self.create(res, self.taint[key]) else: assert Falseclass tstr(tstr): def __iter__(self): return tstr_iterator(self)class tstr_iterator(): def __init__(self, tstr): self._tstr = tstr self._str_idx = 0 def __next__(self): if self._str_idx == len(self._tstr): raise StopIteration # calls tstr getitem should be tstr c = self._tstr[self._str_idx] assert isinstance(c, tstr) self._str_idx += 1 return cclass tstr(tstr): def __add__(self, other): if isinstance(other, tstr): return self.create(str.__add__(self, other), (self.taint + other.taint)) else: return self.create(str.__add__(self, other), (self.taint + [-1 for i in other]))class tstr(tstr): def __radd__(self, other): if other: taint = other.taint if isinstance(other, tstr) else [ None for i in other] else: taint = [] return self.create(str.__add__(other, self), (taint + self.taint))class tstr(tstr): class TaintException(Exception): pass def x(self, i=0): if not self.taint: raise tstr.TaintException('Invalid request idx') if isinstance(i, int): return [self[p] for p in [k for k, j in enumerate(self.taint) if j == i]] elif isinstance(i, slice): r = range(i.start or 0, i.stop or len(self), i.step or 1) return [self[p] for p in [k for k, j in enumerate(self.taint) if j in r]]class tstr(tstr): def replace(self, a, b, n=None): old_taint = self.taint b_taint = b.taint if isinstance(b, tstr) else [None] * len(b) mystr = str(self) i = 0 while True: if n and i >= n: break idx = mystr.find(a) if idx == -1: break last = idx + len(a) mystr = mystr.replace(a, b, 1) partA, partB = old_taint[0:idx], old_taint[last:] old_taint = partA + b_taint + partB i += 1 return self.create(mystr, old_taint)class tstr(tstr): def _split_helper(self, sep, splitted): result_list = [] last_idx = 0 first_idx = 0 sep_len = len(sep) for s in splitted: last_idx = first_idx + len(s) item = self[first_idx:last_idx] result_list.append(item) first_idx = last_idx + sep_len return result_list def _split_space(self, splitted): result_list = [] last_idx = 0 first_idx = 0 sep_len = 0 for s in splitted: last_idx = first_idx + len(s) item = self[first_idx:last_idx] result_list.append(item) v = str(self[last_idx:]) sep_len = len(v) - len(v.lstrip(' ')) first_idx = last_idx + sep_len return result_list def rsplit(self, sep=None, maxsplit=-1): splitted = super().rsplit(sep, maxsplit) if not sep: return self._split_space(splitted) return self._split_helper(sep, splitted) def split(self, sep=None, maxsplit=-1): splitted = super().split(sep, maxsplit) if not sep: return self._split_space(splitted) return self._split_helper(sep, splitted)class tstr(tstr): def strip(self, cl=None): return self.lstrip(cl).rstrip(cl) def lstrip(self, cl=None): res = super().lstrip(cl) i = self.find(res) return self[i:] def rstrip(self, cl=None): res = super().rstrip(cl) return self[0:len(res)]class tstr(tstr): def expandtabs(self, n=8): parts = self.split('\t') res = super().expandtabs(n) all_parts = [] for i, p in enumerate(parts): all_parts.extend(p.taint) if i < len(parts) - 1: l = len(all_parts) % n all_parts.extend([p.taint[-1]] * l) return self.create(res, all_parts)class tstr(tstr): def join(self, iterable): mystr = '' mytaint = [] sep_taint = self.taint lst = list(iterable) for i, s in enumerate(lst): staint = s.taint if isinstance(s, tstr) else [None] * len(s) mytaint.extend(staint) mystr += str(s) if i < len(lst) - 1: mytaint.extend(sep_taint) mystr += str(self) res = super().join(iterable) assert len(res) == len(mystr) return self.create(res, mytaint)class tstr(tstr): def partition(self, sep): partA, sep, partB = super().partition(sep) return (self.create(partA, self.taint[0:len(partA)]), self.create(sep, self.taint[len(partA):len(partA) + len(sep)]), self.create(partB, self.taint[len(partA) + len(sep):])) def rpartition(self, sep): partA, sep, partB = super().rpartition(sep) return (self.create(partA, self.taint[0:len(partA)]), self.create(sep, self.taint[len(partA):len(partA) + len(sep)]), self.create(partB, self.taint[len(partA) + len(sep):]))class tstr(tstr): def ljust(self, width, fillchar=' '): res = super().ljust(width, fillchar) initial = len(res) - len(self) if isinstance(fillchar, tstr): t = fillchar.x() else: t = -1 return self.create(res, [t] * initial + self.taint) def rjust(self, width, fillchar=' '): res = super().rjust(width, fillchar) final = len(res) - len(self) if isinstance(fillchar, tstr): t = fillchar.x() else: t = -1 return self.create(res, self.taint + [t] * final)class tstr(tstr): def swapcase(self): return self.create(str(self).swapcase(), self.taint) def upper(self): return self.create(str(self).upper(), self.taint) def lower(self): return self.create(str(self).lower(), self.taint) def capitalize(self): return self.create(str(self).capitalize(), self.taint) def title(self): return self.create(str(self).title(), self.taint)def taint_include(gword, gsentence): return set(gword.taint) <= set(gsentence.taint)def make_str_wrapper(fun): def proxy(*args, **kwargs): res = fun(*args, **kwargs) return res return proxyimport typeststr_members = [name for name, fn in inspect.getmembers(tstr, callable) if isinstance(fn, types.FunctionType) and fn.__qualname__.startswith('tstr')]for name, fn in inspect.getmembers(str, callable): if name not in set(['__class__', '__new__', '__str__', '__init__', '__repr__', '__getattribute__']) | set(tstr_members): setattr(tstr, name, make_str_wrapper(fn))def make_str_abort_wrapper(fun): def proxy(*args, **kwargs): raise tstr.TaintException('%s Not implemented in TSTR' % fun.__name__) return proxyclass eoftstr(tstr): def create(self, res, taint): return eoftstr(res, taint, self) def __getitem__(self, key): def get_interval(key): return ((0 if key.start is None else key.start), (len(res) if key.stop is None else key.stop)) res = super().__getitem__(key) if isinstance(key, int): key = len(self) + key if key < 0 else key return self.create(res, [self.taint[key]]) elif isinstance(key, slice): if res: return self.create(res, self.taint[key]) # Result is an empty string t = self.create(res, self.taint[key]) key_start, key_stop = get_interval(key) cursor = 0 if key_start < len(self): assert key_stop < len(self) #cursor = self.taint[key_stop] else: if len(self) == 0: # if the original string was empty, we assume that any # empty string produced from it should carry the same # taint. #cursor = self.x() #else: # Key start was not in the string. We can reply only # if the key start was just outside the string, in # which case, we guess. if key_start != len(self): raise tstr.TaintException('Can\'t guess the taint') #cursor = self.taint[len(self) - 1] + 1 # _tcursor gets created only for empty strings. t._tcursor = cursor return t else: assert Falseclass eoftstr(eoftstr): def t(self, i=0): if self.taint: return self.taint[i] else: if i != 0: raise tstr.TaintException('Invalid request idx') # self._tcursor gets created only for empty strings. # use the exception to determine which ones need it. return self._tcursorclass Op(enum.Enum): LT = 0 LE = enum.auto() EQ = enum.auto() NE = enum.auto() GT = enum.auto() GE = enum.auto() IN = enum.auto() NOT_IN = enum.auto() IS = enum.auto() IS_NOT = enum.auto() FIND_STR = enum.auto()COMPARE_OPERATORS = { Op.EQ: lambda x, y: x == y, Op.NE: lambda x, y: x != y, Op.IN: lambda x, y: x in y, Op.NOT_IN: lambda x, y: x not in y, Op.FIND_STR: lambda x, y: x.find(y)}Comparisons = []# ### Instructionsclass Instr: def __init__(self, o, a, b): self.opA = a self.opB = b self.op = o def o(self): if self.op == Op.EQ: return 'eq' elif self.op == Op.NE: return 'ne' else: return '?' def opS(self): if not self.opA.has_taint() and isinstance(self.opB, tstr): return (self.opB, self.opA) else: return (self.opA, self.opB) def op_A(self): return self.opS()[0] def op_B(self): return self.opS()[1] def __repr__(self): return "%s,%s,%s" % (self.o(), repr(self.opA), repr(self.opB)) def __str__(self): if self.op == Op.EQ: if str(self.opA) == str(self.opB): return "%s = %s" % (repr(self.opA), repr(self.opB)) else: return "%s != %s" % (repr(self.opA), repr(self.opB)) elif self.op == Op.NE: if str(self.opA) == str(self.opB): return "%s = %s" % (repr(self.opA), repr(self.opB)) else: return "%s != %s" % (repr(self.opA), repr(self.opB)) elif self.op == Op.IN: if str(self.opA) in str(self.opB): return "%s in %s" % (repr(self.opA), repr(self.opB)) else: return "%s not in %s" % (repr(self.opA), repr(self.opB)) elif self.op == Op.NOT_IN: if str(self.opA) in str(self.opB): return "%s in %s" % (repr(self.opA), repr(self.opB)) else: return "%s not in %s" % (repr(self.opA), repr(self.opB)) else: assert Falseclass ctstr(eoftstr): def create(self, res, taint): o = ctstr(res, taint, self) o.comparisons = self.comparisons return o def add_instr(self, op, c_a, c_b): self.comparisons.append(Instr(op, c_a, c_b)) def with_comparisons(self, comparisons): self.comparisons = comparisons return selfclass ctstr(ctstr): def __eq__(self, other): if len(self) == 0 and len(other) == 0: self.add_instr(Op.EQ, self, other) return True elif len(self) == 0: self.add_instr(Op.EQ, self, other[0]) return False elif len(other) == 0: self.add_instr(Op.EQ, self[0], other) return False elif len(self) == 1 and len(other) == 1: self.add_instr(Op.EQ, self, other) return super().__eq__(other) else: if not self[0] == other[0]: return False return self[1:] == other[1:]class ctstr(ctstr): def __ne__(self, other): return not self.__eq__(other)class ctstr(ctstr): def __contains__(self, other): self.add_instr(Op.IN, self, other) return super().__contains__(other)class ctstr(ctstr): def find(self, sub, start=None, end=None): if start is None: start_val = 0 else: start_val = start if end is None: end_val = len(self) else: end_val = end self.add_instr(Op.IN, self[start_val:end_val], sub) return super().find(sub, start, end)class ctstr(ctstr): def rfind(self, sub, start=None, end=None): if start is None: start_val = 0 else: start_val = start if end is None: end_val = len(self) else: end_val = end self.add_instr(Op.IN, self[start_val:end_val], sub) return super().find(sub, start, end)class ctstr(ctstr): def startswith(self, s, beg =0,end=None): if end == None: end = len(self) self == s[beg:end] return super().startswith(s, beg, end)def substrings(s, l): for i in range(len(s) - (l - 1)): yield s[i:i + l]class ctstr(ctstr): def in_(self, s): if isinstance(s, str): # c in '0123456789' # to # __fn(c).in_('0123456789') # ensure that all characters are compared result = [self == c for c in substrings(s, len(self))] return any(result) else: for item in s: if self == item: return True return Falseclass ctstr(ctstr): def split(self, sep=None, maxsplit=-1): self.add_instr(Op.IN, self, sep) return super().split(sep, maxsplit)class xtstr(ctstr): def __find(self, substr, sub, m): v_ = str(substr) if not v_: return [] v = v_.find(str(sub)) start = substr.taint[0] if v == -1: return [(i, m) for i in range(start, start + v+len(sub))] else: return [(i, m) for i in range(start, start + len(substr))] def add_instr(self, op, c_a, c_b): ct = None m = get_current_method() if len(c_a) == 1 and isinstance(c_a, xtstr): ct = c_a.taint[0] self.comparisons.append((ct, m)) elif len(c_b) == 1 and isinstance(c_b, xtstr): ct = c_b.taint[0] self.comparisons.append((ct, m)) elif op == Op.IN: self.comparisons.extend(self.__find(c_a, c_b, m)) elif len(c_a) == 0 or len(c_b) == 0: pass else: assert False, "op:%s A:%s B:%s" % (op, c_a, c_b) # print(repr(m)) def create(self, res, taint): o = xtstr(res, taint, self) o.comparisons = self.comparisons return o def __hash__(self): return hash(str(self))import inspectdef make_str_abort_wrapper(fun): def proxy(*args, **kwargs): raise tstr.TaintException( '%s Not implemented in `ostr`' % fun.__name__) return proxydefined_xtstr = {}for name, fn in inspect.getmembers(xtstr, callable): clz = fn.__qualname__.split('.')[0] if clz in {'ctstr', 'xtstr'}: defined_xtstr[name] = clzfor name, fn in inspect.getmembers(str, callable): if name not in defined_xtstr and name not in { '__init__', '__str__', '__eq__', '__ne__', '__class__', '__new__', '__setattr__', '__len__', '__getattribute__', '__le__', 'lower', 'strip', 'lstrip', 'rstrip', '__iter__', '__getitem__', '__add__'}: setattr(xtstr, name, make_str_abort_wrapper(fn)) CURRENT_METHOD = NoneMETHOD_NUM_STACK = []METHOD_MAP = {}METHOD_NUM = 0def get_current_method(): return CURRENT_METHODdef set_current_method(method, stack_depth, mid): global CURRENT_METHOD CURRENT_METHOD = (method, stack_depth, mid) return CURRENT_METHODdef trace_init(): global CURRENT_METHOD global METHOD_NUM_STACK global METHOD_MAP global METHOD_NUM CURRENT_METHOD = None METHOD_NUM_STACK.clear() METHOD_MAP.clear() METHOD_NUM = 0 start = (METHOD_NUM, None, []) METHOD_NUM_STACK.append(start) METHOD_MAP[METHOD_NUM] = startdef trace_call(method): global CURRENT_METHOD global METHOD_NUM_STACK global METHOD_MAP global METHOD_NUM METHOD_NUM += 1 # create our method invocation # method_num, method_name, children n = (METHOD_NUM, method, []) METHOD_MAP[METHOD_NUM] = n # add ourselves as one of the children to the previous method invocation METHOD_NUM_STACK[-1][2].append(n) # and set us as the current method. METHOD_NUM_STACK.append(n)def trace_return(): METHOD_NUM_STACK.pop()def trace_set_method(method): set_current_method(method, len(METHOD_NUM_STACK), METHOD_NUM_STACK[-1][0]) class in_wrap: def __init__(self, s): self.s = s def in_(self, s): return self.s in sdef taint_wrap__(st): if isinstance(st, str): return in_wrap(st) else: return stdef wrap_input(inputstr): return xtstr(inputstr, parent=None).with_comparisons([])def convert_comparisons(comparisons, inputstr): light_comparisons = [] for idx, (method, stack_depth, mid) in comparisons: if idx is None: continue light_comparisons.append((idx, inputstr[idx], mid)) return light_comparisonsdef convert_method_map(method_map): light_map = {} for k in method_map: method_num, method_name, children = method_map[k] light_map[k] = (k, method_name, [c[0] for c in children]) return light_map# ])We write both files to the appropriate locations.We write both files to the appropriate locations.
xxxxxxxxxx# [(with open('build/mimid_context.py', 'w+') as f: print(VARS['mimid_method_context'], file=f) print(VARS['mimid_stack_context'], file=f) print(VARS['mimid_scope_context'], file=f)with open('build/taints.py', 'w+') as f: print(VARS['taints_src'], file=f)# )]Here is how one can generate traces for the `calc` program.Here is how one can generate traces for the calc program.
xxxxxxxxxx%top do(['mkdir','-p','samples/calc']).returncodexxxxxxxxxx%top do(['mkdir','-p','samples/mathexpr']).returncodexxxxxxxxxx%%top# [(with open('samples/calc/0.csv', 'w+') as f: print('9-(16+72)*3/458', file=f) with open('samples/calc/1.csv', 'w+') as f: print('(9)+3/4/58', file=f) with open('samples/calc/2.csv', 'w+') as f: print('8*3/40', file=f)# )]Generating traces on `mathexpr`.Generating traces on mathexpr.
xxxxxxxxxx%%top# [(with open('samples/mathexpr/0.csv', 'w+') as f: print('100', file=f) with open('samples/mathexpr/1.csv', 'w+') as f: print('2 + 3', file=f) with open('samples/mathexpr/2.csv', 'w+') as f: print('4 * 5', file=f)# )]xxxxxxxxxx%top calc_trace_out = do("python build/calculator.py samples/calc/*.csv", shell=True).stdoutxxxxxxxxxx%top mathexpr_trace_out = do("python build/mathexpr.py samples/mathexpr/*.csv", shell=True).stdoutxxxxxxxxxximport jsonxxxxxxxxxx%top calc_trace = json.loads(calc_trace_out)xxxxxxxxxx%top mathexpr_trace = json.loads(mathexpr_trace_out)### Reconstructing the Method Tree with Attached Character ComparisonsReconstruct the actual method trace from a trace with the followingformat```key : [ mid, method_name, children_ids ]```Reconstruct the actual method trace from a trace with the following format
key : [ mid, method_name, children_ids ]xxxxxxxxxxdef reconstruct_method_tree(method_map): first_id = None tree_map = {} for key in method_map: m_id, m_name, m_children = method_map[key] children = [] if m_id in tree_map: # just update the name and children assert not tree_map[m_id] tree_map[m_id]['id'] = m_id tree_map[m_id]['name'] = m_name tree_map[m_id]['indexes'] = [] tree_map[m_id]['children'] = children else: assert first_id is None tree_map[m_id] = {'id': m_id, 'name': m_name, 'children': children, 'indexes': []} first_id = m_id for c in m_children: assert c not in tree_map val = {} tree_map[c] = val children.append(val) return first_id, tree_mapHere is how one would use it. The first element in the returned tuple is the id of the bottom most method call.Here is how one would use it. The first element in the returned tuple is the id of the bottom most method call.
xxxxxxxxxxfrom fuzzingbook.GrammarFuzzer import display_treexxxxxxxxxx%top first, calc_method_tree1 = reconstruct_method_tree(calc_trace[0]['method_map'])xxxxxxxxxx%top first, mathexpr_method_tree1 = reconstruct_method_tree(mathexpr_trace[0]['method_map'])xxxxxxxxxxdef extract_node(node, id): symbol = str(node['id']) children = node['children'] annotation = str(node['name']) return "%s:%s" % (symbol, annotation), children, ''xxxxxxxxxx%top v = display_tree(calc_method_tree1[0], extract_node=extract_node)xxxxxxxxxxfrom IPython.display import Imagexxxxxxxxxxdef zoom(v, zoom=True): # return v directly if you do not want to zoom out. if zoom: return Image(v.render(format='png')) return vxxxxxxxxxx%top zoom(v)xxxxxxxxxx%top zoom(display_tree(mathexpr_method_tree1[0], extract_node=extract_node))#### Identifying last comparisonsWe need only the last comparisons made on any index. This means that we should care for only the last parse in an ambiguous parse. However, to make concessions for real world, we also check if we are overwriting a child (`HEURISTIC`). Note that `URLParser` is the only parser that needs this heuristic.We need only the last comparisons made on any index. This means that we should care for only the last parse in an ambiguous parse. However, to make concessions for real world, we also check if we are overwriting a child (HEURISTIC). Note that URLParser is the only parser that needs this heuristic.
xxxxxxxxxxdef last_comparisons(comparisons): HEURISTIC = True last_cmp_only = {} last_idx = {} # get the last indexes compared in methods. for idx, char, mid in comparisons: if mid in last_idx: if idx > last_idx[mid]: last_idx[mid] = idx else: last_idx[mid] = idx for idx, char, mid in comparisons: if HEURISTIC: if idx in last_cmp_only: if last_cmp_only[idx] > mid: # do not clobber children unless it was the last character # for that child. if last_idx[mid] > idx: # if it was the last index, may be the child used it # as a boundary check. continue last_cmp_only[idx] = mid return last_cmp_onlyHere is how one would use it.Here is how one would use it.
xxxxxxxxxx%top calc_last_comparisons1 = last_comparisons(calc_trace[0]['comparisons'])xxxxxxxxxx%top calc_last_comparisons1xxxxxxxxxx%top mathexpr_last_comparisons1 = last_comparisons(mathexpr_trace[0]['comparisons'])xxxxxxxxxx%top mathexpr_last_comparisons1#### Attaching characters to the treeAdd the comparison indexes to the method tree that we constructedAdd the comparison indexes to the method tree that we constructed
xxxxxxxxxxdef attach_comparisons(method_tree, comparisons): for idx in comparisons: mid = comparisons[idx] method_tree[mid]['indexes'].append(idx)Here is how one would use it. Note which method call each input index is associated. For example, the first index is associated with method call id: 6, which corresponds to `is_digit`.Here is how one would use it. Note which method call each input index is associated. For example, the first index is associated with method call id: 6, which corresponds to is_digit.
xxxxxxxxxx%top attach_comparisons(calc_method_tree1, calc_last_comparisons1)xxxxxxxxxx%top calc_method_tree1xxxxxxxxxx%top attach_comparisons(mathexpr_method_tree1, mathexpr_last_comparisons1)xxxxxxxxxx%top mathexpr_method_tree1xxxxxxxxxxdef wrap_input(istr): def extract_node(node, id): symbol = str(node['id']) children = node['children'] annotation = str(node['name']) indexes = repr(tuple([istr[i] for i in node['indexes']])) return "%s %s" % (annotation, indexes), children, '' return extract_nodexxxxxxxxxx%top extract_node1 = wrap_input(calc_trace[0]['inputstr'])xxxxxxxxxx%top zoom(display_tree(calc_method_tree1[0], extract_node=extract_node1))xxxxxxxxxx%top extract_node1 = wrap_input(mathexpr_trace[0]['inputstr'])xxxxxxxxxx%top zoom(display_tree(mathexpr_method_tree1[0], extract_node=extract_node1))We define `to_node()` a convenience function that, given a list of _contiguous_ indexes and original string, translates it to a leaf node of a tree (that corresponds to the derivation tree syntax in the Fuzzingbook) with a string, empty children, and starting node and ending node.We define to_node() a convenience function that, given a list of contiguous indexes and original string, translates it to a leaf node of a tree (that corresponds to the derivation tree syntax in the Fuzzingbook) with a string, empty children, and starting node and ending node.
Convert a list of indexes to a corresponding terminal tree nodeConvert a list of indexes to a corresponding terminal tree node
xxxxxxxxxxdef to_node(idxes, my_str): assert len(idxes) == idxes[-1] - idxes[0] + 1 assert min(idxes) == idxes[0] assert max(idxes) == idxes[-1] return my_str[idxes[0]:idxes[-1] + 1], [], idxes[0], idxes[-1]Here is how one would use it.Here is how one would use it.
xxxxxxxxxx%top to_node(calc_method_tree1[6]['indexes'], calc_trace[0]['inputstr'])xxxxxxxxxxfrom operator import itemgetterimport itertools as itWe now need to identify the terminal (leaf) nodes. For that, we want to group contiguous letters in a node together, and call it a leaf node. So, convert our list of indexes to lists of contiguous indexes first, then convert them to terminal tree nodes. Then, return a set of one level child nodes with contiguous chars from indexes.We now need to identify the terminal (leaf) nodes. For that, we want to group contiguous letters in a node together, and call it a leaf node. So, convert our list of indexes to lists of contiguous indexes first, then convert them to terminal tree nodes. Then, return a set of one level child nodes with contiguous chars from indexes.
xxxxxxxxxxdef indexes_to_children(indexes, my_str): lst = [ list(map(itemgetter(1), g)) for k, g in it.groupby(enumerate(indexes), lambda x: x[0] - x[1]) ] return [to_node(n, my_str) for n in lst]xxxxxxxxxx%top indexes_to_children(calc_method_tree1[6]['indexes'], calc_trace[0]['inputstr'])Finally, we need to remove the overlap from the trees we have so far. The idea is that, given a node, each child node of that node should be uniquely responsible for a specified range of characters, with no overlap allowed between the children. The starting of the first child to ending of the last child will be the range of the node.Finally, we need to remove the overlap from the trees we have so far. The idea is that, given a node, each child node of that node should be uniquely responsible for a specified range of characters, with no overlap allowed between the children. The starting of the first child to ending of the last child will be the range of the node.
#### Removing OverlapIf overlap is found, the tie is biased to the later child. That is, the later child gets to keep the range, and the former child is recursively traversed to remove overlaps from its children. If a child is completely included in the overlap, the child is excised. A few convenience functions first:If overlap is found, the tie is biased to the later child. That is, the later child gets to keep the range, and the former child is recursively traversed to remove overlaps from its children. If a child is completely included in the overlap, the child is excised. A few convenience functions first:
xxxxxxxxxxdef does_item_overlap(s, e, s_, e_): return (s_ >= s and s_ <= e) or (e_ >= s and e_ <= e) or (s_ <= s and e_ >= e)xxxxxxxxxxdef is_second_item_included(s, e, s_, e_): return (s_ >= s and e_ <= e)xxxxxxxxxxdef has_overlap(ranges, s_, e_): return {(s, e) for (s, e) in ranges if does_item_overlap(s, e, s_, e_)}xxxxxxxxxxdef is_included(ranges, s_, e_): return {(s, e) for (s, e) in ranges if is_second_item_included(s, e, s_, e_)}xxxxxxxxxxdef remove_overlap_from(original_node, orange): node, children, start, end = original_node new_children = [] if not children: return None start = -1 end = -1 for child in children: if does_item_overlap(*child[2:4], *orange): new_child = remove_overlap_from(child, orange) if new_child: # and new_child[1]: if start == -1: start = new_child[2] new_children.append(new_child) end = new_child[3] else: new_children.append(child) if start == -1: start = child[2] end = child[3] if not new_children: return None assert start != -1 assert end != -1 return (node, new_children, start, end)Verify that there is no overlap.Verify that there is no overlap.
xxxxxxxxxxdef no_overlap(arr): my_ranges = {} for a in arr: _, _, s, e = a included = is_included(my_ranges, s, e) if included: continue # we will fill up the blanks later. else: overlaps = has_overlap(my_ranges, s, e) if overlaps: # unlike include which can happen only once in a set of # non-overlapping ranges, overlaps can happen on multiple parts. # The rule is, the later child gets the say. So, we recursively # remove any ranges that overlap with the current one from the # overlapped range. assert len(overlaps) == 1 oitem = list(overlaps)[0] v = remove_overlap_from(my_ranges[oitem], (s,e)) del my_ranges[oitem] if v: my_ranges[v[2:4]] = v my_ranges[(s, e)] = a else: my_ranges[(s, e)] = a res = my_ranges.values() # assert no overlap, and order by starting index s = sorted(res, key=lambda x: x[2]) return s#### Generate derivation treeConvert a mapped tree to the _fuzzingbook_ style derivation tree.Convert a mapped tree to the fuzzingbook style derivation tree.
xxxxxxxxxxdef to_tree(node, my_str): method_name = ("<%s>" % node['name']) if node['name'] is not None else '<START>' indexes = node['indexes'] node_children = [to_tree(c, my_str) for c in node.get('children', [])] idx_children = indexes_to_children(indexes, my_str) children = no_overlap([c for c in node_children if c is not None] + idx_children) if not children: return None start_idx = children[0][2] end_idx = children[-1][3] si = start_idx my_children = [] # FILL IN chars that we did not compare. This is likely due to an i + n # instruction. for c in children: if c[2] != si: sbs = my_str[si: c[2]] my_children.append((sbs, [], si, c[2] - 1)) my_children.append(c) si = c[3] + 1 m = (method_name, my_children, start_idx, end_idx) return mxxxxxxxxxx%top zoom(display_tree(to_tree(calc_method_tree1[0], calc_trace[0]['inputstr'])))xxxxxxxxxx%top zoom(display_tree(to_tree(mathexpr_method_tree1[0], mathexpr_trace[0]['inputstr'])))### The Complete MinerWe now put everything together. The `miner()` takes the traces, produces trees out of them, and verifies that the trees actually correspond to the input.We now put everything together. The miner() takes the traces, produces trees out of them, and verifies that the trees actually correspond to the input.
xxxxxxxxxxfrom fuzzingbook.GrammarFuzzer import tree_to_stringxxxxxxxxxxdef miner(call_traces): my_trees = [] for call_trace in call_traces: method_map = call_trace['method_map'] first, method_tree = reconstruct_method_tree(method_map) comparisons = call_trace['comparisons'] attach_comparisons(method_tree, last_comparisons(comparisons)) my_str = call_trace['inputstr'] #print("INPUT:", my_str, file=sys.stderr) tree = to_tree(method_tree[first], my_str) #print("RECONSTRUCTED INPUT:", tree_to_string(tree), file=sys.stderr) my_tree = {'tree': tree, 'original': call_trace['original'], 'arg': call_trace['arg']} assert tree_to_string(tree) == my_str my_trees.append(my_tree) return my_treesUsing the `miner()`Using the miner()
xxxxxxxxxx%top mined_calc_trees = miner(calc_trace)%top calc_tree = mined_calc_trees[0]%top zoom(display_tree(calc_tree['tree']))xxxxxxxxxx%top mined_mathexpr_trees = miner(mathexpr_trace)%top mathexpr_tree = mined_mathexpr_trees[1]%top zoom(display_tree(mathexpr_tree['tree']))One of the problems that you can notice in the tree generated is that each `while` iterations get a different identifier. e.g. ``` ('<parse_expr:while_1 ? [2]>', [('+', [], 5, 5)], 5, 5), ('<parse_expr:while_1 ? [3]>', [('<parse_expr:if_1 + 0#[3, -1]>', [('<parse_num>', [('<is_digit>', [('7', [], 6, 6)], 6, 6), ('<is_digit>', [('2', [], 7, 7)], 7, 7)], 6, 7)], 6, 7)],```The separate identifiers are intentional because we do not yet know the actual dependencies between different iterations such as closing quotes, or closing braces or parenthesis. However, this creates a problem when we mine grammar because we need to match up the compatible nodes.Generalizer does it through actively doing surgery on the tree to see whether a node is replaceable with another.One of the problems that you can notice in the tree generated is that each while iterations get a different identifier. e.g.
('<parse_expr:while_1 ? [2]>', [('+', [], 5, 5)], 5, 5),
('<parse_expr:while_1 ? [3]>',
[('<parse_expr:if_1 + 0#[3, -1]>',
[('<parse_num>',
[('<is_digit>', [('7', [], 6, 6)], 6, 6),
('<is_digit>', [('2', [], 7, 7)], 7, 7)],
6,
7)],
6,
7)],
The separate identifiers are intentional because we do not yet know the actual dependencies between different iterations such as closing quotes, or closing braces or parenthesis. However, this creates a problem when we mine grammar because we need to match up the compatible nodes.
Generalizer does it through actively doing surgery on the tree to see whether a node is replaceable with another.
xxxxxxxxxximport copyimport random### Checking compatibility of nodesWe first need a few helper functions. The `replace_nodes()` function try to replace the _contents_ of the first node with the _contents_ of the second (That is, the tree that has these nodes will automatically be modified), collect the produced string from the tree, and reset any changes. The arguments are tuples with the following format: (node, file_name, tree)We first need a few helper functions. The replace_nodes() function try to replace the contents of the first node with the contents of the second (That is, the tree that has these nodes will automatically be modified), collect the produced string from the tree, and reset any changes. The arguments are tuples with the following format: (node, file_name, tree)
xxxxxxxxxxdef replace_nodes(a2, a1): node2, _, t2 = a2 node1, _, t1 = a1 str2_old = tree_to_string(t2) old = copy.copy(node2) node2.clear() for n in node1: node2.append(n) str2_new = tree_to_string(t2) assert str2_old != str2_new node2.clear() for n in old: node2.append(n) str2_last = tree_to_string(t2) assert str2_old == str2_last return str2_newCan a given node be replaced with another? The idea is, given two nodes (possibly from two trees), can the first node be replaced by the second, and still result in a valid string?Can a given node be replaced with another? The idea is, given two nodes (possibly from two trees), can the first node be replaced by the second, and still result in a valid string?
xxxxxxxxxxdef is_compatible(a1, a2, module): if tree_to_string(a1[0]) == tree_to_string(a2[0]): return True my_string = replace_nodes(a1, a2) return check(my_string, module)xxxxxxxxxx%%var check_src# [(import sys, impparse_ = imp.new_module('parse_')def init_module(src): with open(src) as sf: exec(sf.read(), parse_.__dict__)def _check(s): try: parse_.main(s) return True except: return Falseimport sysdef main(args): init_module(args[0]) if _check(args[1]): sys.exit(0) else: sys.exit(1)import sysmain(sys.argv[1:])# )]xxxxxxxxxx# [(with open('build/check.py', 'w+') as f: print(VARS['check_src'], file=f)# )]xxxxxxxxxxEXEC_MAP = {}NODE_REGISTER = {}TREE = NoneFILE = Nonexxxxxxxxxxdef reset_generalizer(): global NODE_REGISTER, TREE, FILE, EXEC_MAP NODE_REGISTER={} TREE = None FILE = None EXEC_MAP = {}xxxxxxxxxxreset_generalizer()xxxxxxxxxximport os.pathxxxxxxxxxxdef check(s, module): if s in EXEC_MAP: return EXEC_MAP[s] result = do(["python", "./build/check.py", "subjects/%s" % module, s], shell=False) with open('build/%s.log' % module, 'a+') as f: print(s, file=f) print(' '.join(["python", "./build/check.py", "subjects/%s" % module, s]), file=f) print(":=", result.returncode, file=f) print("\n", file=f) v = (result.returncode == 0) EXEC_MAP[s] = v return vxxxxxxxxxxdef to_modifiable(derivation_tree): node, children, *rest = derivation_tree return [node, [to_modifiable(c) for c in children], *rest]xxxxxxxxxx%top calc_tree_ = to_modifiable(calc_tree['tree'])%top while_loops = calc_tree_[1][0][1][0][1]xxxxxxxxxx%top while_loops[0]xxxxxxxxxx%top while_loops[1]xxxxxxxxxx%top assert not is_compatible((while_loops[1], 'c.py', calc_tree_), (while_loops[0], 'c.py', calc_tree_), 'calculator.py')xxxxxxxxxx%top assert is_compatible((while_loops[0], 'c.py', calc_tree_), (while_loops[2], 'c.py', calc_tree_), 'calculator.py')We need to extract meta information from the names, and update it back. TODO: make the meta info JSON.We need to extract meta information from the names, and update it back. TODO: make the meta info JSON.
xxxxxxxxxxdef parse_name(name): assert name[0] + name[-1] == '<>' name = name[1:-1] method, rest = name.split(':') ctrl_name, space, rest = rest.partition(' ') can_empty, space, stack = rest.partition(' ') ctrl, cname = ctrl_name.split('_') if ':while_' in name: method_stack = json.loads(stack) return method, ctrl, int(cname), 0, can_empty, method_stack elif ':if_' in name: num, mstack = stack.split('#') method_stack = json.loads(mstack) return method, ctrl, int(cname), num, can_empty, method_stackxxxxxxxxxx%top [parse_name(w[0]) for w in while_loops]xxxxxxxxxxdef unparse_name(method, ctrl, name, num, can_empty, cstack): if ctrl == 'while': return "<%s:%s_%s %s %s>" % (method, ctrl, name, can_empty, json.dumps(cstack)) else: return "<%s:%s_%s %s %s#%s>" % (method, ctrl, name, can_empty, num, json.dumps(cstack))Verify that parsing and unparsing works.Verify that parsing and unparsing works.
xxxxxxxxxx%top assert all(unparse_name(*parse_name(w[0])) == w[0] for w in while_loops)### Propagate rename of the `while` node up the child nodes.The `update_stack()` when given a node, and a new name, recursively updates the method stack in the children.while node up the child nodes.¶The update_stack() when given a node, and a new name, recursively updates the method stack in the children.
xxxxxxxxxxdef update_stack(node, at, new_name): nname, children, *rest = node if not (':if_' in nname or ':while_' in nname): return method, ctrl, cname, num, can_empty, cstack = parse_name(nname) cstack[at] = new_name name = unparse_name(method, ctrl, cname, num, can_empty, cstack) #assert '?' not in name node[0] = name for c in children: update_stack(c, at, new_name)Update the node name once we have identified that it corresponds to a global name.Update the node name once we have identified that it corresponds to a global name.
xxxxxxxxxxdef update_name(k_m, my_id, seen): # fixup k_m with what is in my_id, and update seen. original = k_m[0] method, ctrl, cname, num, can_empty, cstack = parse_name(original) #assert can_empty != '?' cstack[-1] = float('%d.0' % my_id) name = unparse_name(method, ctrl, cname, num, can_empty, cstack) seen[k_m[0]] = name k_m[0] = name # only replace it at the len(cstack) -1 the # until the first non-cf token children = [] for c in k_m[1]: update_stack(c, len(cstack)-1, cstack[-1]) return name, k_mNote that the rename happens only within the current method stack. That is, it does not propagate across method calls. Here is how one would use it.Note that the rename happens only within the current method stack. That is, it does not propagate across method calls. Here is how one would use it.
xxxxxxxxxx%top while_loops[2]We update the iteration number `3` with a global id `4.0`We update the iteration number 3 with a global id 4.0
xxxxxxxxxx%top name, node = update_name(while_loops[2], 4, {})%top node##### replace a set of nodesWe want to replace the `while` loop iteration identifiers with a global identifier. For that, we are given a list of nodes that are compatible with global ones. We first extract the iteration id from the global node, and apply it on the `while` node under consideration.We want to replace the while loop iteration identifiers with a global identifier. For that, we are given a list of nodes that are compatible with global ones. We first extract the iteration id from the global node, and apply it on the while node under consideration.
xxxxxxxxxxdef replace_stack_and_mark_star(to_replace): # remember, we only replace whiles. for (i, j) in to_replace: method1, ctrl1, cname1, num1, can_empty1, cstack1 = parse_name(i[0]) method2, ctrl2, cname2, num2, can_empty2, cstack2 = parse_name(j[0]) assert method1 == method2 assert ctrl1 == ctrl2 assert cname1 == cname2 #assert can_empty2 != '?' # fixup the can_empty new_name = unparse_name(method1, ctrl1, cname1, num1, can_empty2, cstack1) i[0] = new_name assert len(cstack1) == len(cstack2) update_stack(i, len(cstack2)-1, cstack2[-1]) to_replace.clear()### Generalize a given set of loopsThe main workhorse. It generalizes the looping constructs. It is given a set of while loops with the same label under the current node. TODO: Refactor when we actually have time.The main workhorse. It generalizes the looping constructs. It is given a set of while loops with the same label under the current node. TODO: Refactor when we actually have time.
##### Helper: node inclusionChecking for node inclusion. We do not want to try including a first node in second if the first node already contains the second. It will lead to infinite loop on `tree_to_string()`.Checking for node inclusion. We do not want to try including a first node in second if the first node already contains the second. It will lead to infinite loop on tree_to_string().
xxxxxxxxxxdef node_include(i, j): name_i, children_i, s_i, e_i = i name_j, children_j, s_j, e_j = j return s_i <= s_j and e_i >= e_j##### Helper: sortingOrdering nodes by their highest complexity to avoid spurious can-replace answers.Ordering nodes by their highest complexity to avoid spurious can-replace answers.
xxxxxxxxxxdef num_tokens(v, s): name, child, *rest = v s.add(name) [num_tokens(i, s) for i in child] return len(s)def s_fn(v): return num_tokens(v[0], set())xxxxxxxxxxMAX_SAMPLES = 1 # with reasonably complex inputs, this is sufficent if we do the surgery both ways.First, we check whether any of the loops we have are compatible with the globally registered loops in `while_register`.First, we check whether any of the loops we have are compatible with the globally registered loops in while_register.
xxxxxxxxxxdef check_registered_loops_for_compatibility(idx_map, while_register, module): seen = {} to_replace = [] idx_keys = sorted(idx_map.keys()) for while_key, f in while_register[0]: # try sampling here. my_values = while_register[0][(while_key, f)] v_ = random.choice(my_values) for k in idx_keys: k_m = idx_map[k] if k_m[0] in seen: continue if len(my_values) > MAX_SAMPLES: lst = [v for v in my_values if not node_include(v[0], k_m)] values = sorted(lst, key=s_fn, reverse=True)[0:MAX_SAMPLES] else: values = my_values # all values in v should be tried. replace = 0 for v in values: assert v[0][0] == v_[0][0] if f != FILE or not node_include(v[0], k_m): # if not k_m includes v a = is_compatible((k_m, FILE, TREE), v, module) if not a: replace = 0 break else: replace += 1 if f != FILE or not node_include(k_m, v[0]): b = is_compatible(v, (k_m, FILE, TREE), module) if not b: replace = 0 break else: replace += 1 # at least one needs to vouch, and all capable needs to agree. if replace: to_replace.append((k_m, v_[0])) # <- replace k_m by v seen[k_m[0]] = True replace_stack_and_mark_star(to_replace)Next, for all the loops that remain, check if they can be deleted. If they can be, we want to place `Epsilon == *` in place of `?` in the `can_empty` position.Next, for all the loops that remain, check if they can be deleted. If they can be, we want to place Epsilon == * in place of ? in the can_empty position.
xxxxxxxxxxdef can_the_loop_be_deleted(idx_map, while_register, module): idx_keys = sorted(idx_map.keys()) for i in idx_keys: i_m = idx_map[i] if '.0' in i_m[0]: # assert '?' not in i_m[0] continue a = is_compatible((i_m, FILE, TREE), (['', [], 0, 0], FILE, TREE), module) method1, ctrl1, cname1, num1, can_empty, cstack1 = parse_name(i_m[0]) name = unparse_name(method1, ctrl1, cname1, num1, Epsilon if a else NoEpsilon, cstack1) i_m[0] = nameNext, we check all current loops whether they are compatible with each other. Essentially, we start from the back, and check if the first or second or third ... nodes are compatible with the last node. Then take the second last node and do the same.If they are, we use the same name for all compatible nodes.Next, we check all current loops whether they are compatible with each other. Essentially, we start from the back, and check if the first or second or third ... nodes are compatible with the last node. Then take the second last node and do the same.
If they are, we use the same name for all compatible nodes.
xxxxxxxxxxdef check_current_loops_for_compatibility(idx_map, while_register, module): to_replace = [] rkeys = sorted(idx_map.keys(), reverse=True) for i in rkeys: # <- nodes to check for replacement -- started from the back i_m = idx_map[i] # assert '?' not in i_m[0] if '.0' in i_m[0]: continue j_keys = sorted([j for j in idx_map.keys() if j < i]) for j in j_keys: # <- nodes that we can replace i_m with -- starting from front. j_m = idx_map[j] # assert '?' not in j_m[0] if i_m[0] == j_m[0]: break # previous whiles worked. replace = False if not node_include(j_m, i_m): a = is_compatible((i_m, FILE, TREE), (j_m, FILE, TREE), module) if not a: continue replace = True if not node_include(i_m, j_m): b = is_compatible((j_m, FILE, TREE), (i_m, FILE, TREE), module) if not b: continue replace = True if replace: to_replace.append((i_m, j_m)) # <- replace i_m by j_m break replace_stack_and_mark_star(to_replace)Finally, register all the new while loops discovered.Finally, register all the new while loops discovered.
xxxxxxxxxxdef register_new_loops(idx_map, while_register): idx_keys = sorted(idx_map.keys()) seen = {} for k in idx_keys: k_m = idx_map[k] if ".0" not in k_m[0]: if k_m[0] in seen: k_m[0] = seen[k_m[0]] # and update method1, ctrl1, cname1, num1, can_empty1, cstack1 = parse_name(k_m[0]) update_name(k_m, cstack1[-1], seen) continue # new! get a brand new name! while_register[1] += 1 my_id = while_register[1] original_name = k_m[0] #assert '?' not in original_name name, new_km = update_name(k_m, my_id, seen) #assert '?' not in name while_register[0][(name, FILE)] = [(new_km, FILE, TREE)] else: name = k_m[0] if (name, FILE) not in while_register[0]: while_register[0][(name, FILE)] = [] while_register[0][(name, FILE)].append((k_m, FILE, TREE))All together.All together.
xxxxxxxxxxdef generalize_loop(idx_map, while_register, module): # First we check the previous while loops check_registered_loops_for_compatibility(idx_map, while_register, module) # Check whether any of these can be deleted. can_the_loop_be_deleted(idx_map, while_register, module) # then we check he current while iterations check_current_loops_for_compatibility(idx_map, while_register, module) # lastly, update all while names. register_new_loops(idx_map, while_register)We keep a global registry of nodes, so that we can use the same iteration labels.We keep a global registry of nodes, so that we can use the same iteration labels.
xxxxxxxxxx# NODE_REGISTER = {}### Collect loops to generalizeThe idea is to look through the tree, looking for while loops.When one sees a while loop, start at one end, and see if thewhile iteration index can be replaced by the first one, and viceversa. If not, try with the second one and so on until the first onesucceeds. When one succeeds, replace the definition of the matchingone with an alternate with the last's definition, and replace thename of last with the first, and delete last. Here, we only collect the while loops with same labels, with `generalize_loop()` doing the rest.The idea is to look through the tree, looking for while loops.
When one sees a while loop, start at one end, and see if the
while iteration index can be replaced by the first one, and vice
versa. If not, try with the second one and so on until the first one
succeeds. When one succeeds, replace the definition of the matching
one with an alternate with the last's definition, and replace the
name of last with the first, and delete last. Here, we only collect the while loops with same labels, with generalize_loop() doing the rest.
xxxxxxxxxxdef generalize(tree, module): node, children, *_rest = tree if node not in NODE_REGISTER: NODE_REGISTER[node] = {} register = NODE_REGISTER[node] for child in children: generalize(child, module) idxs = {} last_while = None for i,child in enumerate(children): # now we need to map the while_name here to the ones in node # register. Essentially, we try to replace each. if ':while_' not in child[0]: continue while_name = child[0].split(' ')[0] if last_while is None: last_while = while_name if while_name not in register: register[while_name] = [{}, 0] else: if last_while != while_name: # a new while! Generalize the last last_while = while_name generalize_loop(idxs, register[last_while]) idxs[i] = child if last_while is not None: generalize_loop(idxs, register[last_while], module)We need the ability for fairly deep surgery. So we dump and load the mined trees to convert tuples to arrays.We need the ability for fairly deep surgery. So we dump and load the mined trees to convert tuples to arrays.
xxxxxxxxxxdef generalize_iter(jtrees, log=False): global TREE, FILE new_trees = [] for j in jtrees: FILE = j['arg'] if log: print(FILE, file=sys.stderr) sys.stderr.flush() TREE = to_modifiable(j['tree']) generalize(TREE, j['original']) j['tree'] = TREE new_trees.append(copy.deepcopy(j)) return new_treesxxxxxxxxxxfrom fuzzingbook.GrammarFuzzer import extract_node as extract_node_oxxxxxxxxxx%top reset_generalizer()%top generalized_calc_trees = generalize_iter(mined_calc_trees)%top zoom(display_tree(generalized_calc_trees[0]['tree'], extract_node=extract_node_o))xxxxxxxxxx%top reset_generalizer()%top generalized_mathexpr_trees = generalize_iter(mined_mathexpr_trees)%top zoom(display_tree(generalized_mathexpr_trees[1]['tree'], extract_node=extract_node_o))## Generating a GrammarGenerating a grammar from the generalized derivation trees is pretty simple. Start at the start node, and any node that represents a method or a pseudo method becomes a nonterminal. The children forms alternate expansions for the nonterminal. Since all the keys are compatible, merging the grammar is simply merging the hash map.Generating a grammar from the generalized derivation trees is pretty simple. Start at the start node, and any node that represents a method or a pseudo method becomes a nonterminal. The children forms alternate expansions for the nonterminal. Since all the keys are compatible, merging the grammar is simply merging the hash map.
First, we define a pretty printer for grammar.First, we define a pretty printer for grammar.
xxxxxxxxxximport reRE_NONTERMINAL = re.compile(r'(<[^<> ]*>)')xxxxxxxxxxdef recurse_grammar(grammar, key, order, canonical): rules = sorted(grammar[key]) old_len = len(order) for rule in rules: if not canonical: res = re.findall(RE_NONTERMINAL, rule) else: res = rule for token in res: if token.startswith('<') and token.endswith('>'): if token not in order: order.append(token) new = order[old_len:] for ckey in new: recurse_grammar(grammar, ckey, order, canonical)xxxxxxxxxxdef show_grammar(grammar, start_symbol='<START>', canonical=True): order = [start_symbol] recurse_grammar(grammar, start_symbol, order, canonical) assert len(order) == len(grammar.keys()) return {k: sorted(grammar[k]) for k in order}xxxxxxxxxxdef to_grammar(tree, grammar): node, children, _, _ = tree tokens = [] if node not in grammar: grammar[node] = list() for c in children: if c[1] == []: tokens.append(c[0]) else: tokens.append(c[0]) to_grammar(c, grammar) grammar[node].append(tuple(tokens)) return grammarxxxxxxxxxxdef merge_grammar(g1, g2): all_keys = set(list(g1.keys()) + list(g2.keys())) merged = {} for k in all_keys: alts = set(g1.get(k, []) + g2.get(k, [])) merged[k] = alts return {k:[l for l in merged[k]] for k in merged}xxxxxxxxxxdef convert_to_grammar(my_trees): grammar = {} for my_tree in my_trees: tree = my_tree['tree'] src = my_tree['original'] g = to_grammar(tree, grammar) grammar = merge_grammar(grammar, g) return grammarxxxxxxxxxx%top calc_grammar = convert_to_grammar(generalized_calc_trees)%top show_grammar(calc_grammar)xxxxxxxxxx%top mathexpr_grammar = convert_to_grammar(generalized_mathexpr_trees)%top show_grammar(mathexpr_grammar)The grammar generated may still contain meta characters such as `<` and `>`. We need to cleanup these to make it a grammar that is fuzzable using the Fuzzingbook fuzzers.The grammar generated may still contain meta characters such as < and >. We need to cleanup these to make it a grammar that is fuzzable using the Fuzzingbook fuzzers.
xxxxxxxxxxdef to_fuzzable_grammar(grammar): def escape(t): if ((t[0]+t[-1]) == '<>'): return t.replace(' ', '_') else: return t new_g = {} for k in grammar: new_alt = [] for rule in grammar[k]: new_alt.append(''.join([escape(t) for t in rule])) new_g[k.replace(' ', '_')] = new_alt return new_gxxxxxxxxxxfrom fuzzingbook import GrammarFuzzerxxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(calc_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(mathexpr_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]### Inserting Empty Alternatives for IF and LoopsNext, we want to insert empty rules for those loops and conditionals that can be skipped. For loops, the entire sequence has to contain the empty marker.Next, we want to insert empty rules for those loops and conditionals that can be skipped. For loops, the entire sequence has to contain the empty marker.
xxxxxxxxxxdef check_empty_rules(grammar): new_grammar = {} for k in grammar: if k in ':if_': name, marker = k.split('#') if name.endswith(' *'): new_grammar[k] = grammar[k].add(('',)) else: new_grammar[k] = grammar[k] elif k in ':while_': # TODO -- we have to check the rules for sequences of whiles. # for now, ignore. new_grammar[k] = grammar[k] else: new_grammar[k] = grammar[k] return new_grammarxxxxxxxxxx%top ne_calc_grammar = check_empty_rules(calc_grammar)%top show_grammar(ne_calc_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(ne_calc_grammar), start_symbol='<START>')for i in range(10): print(repr(gf.fuzz()))# )]xxxxxxxxxx%top ne_mathexpr_grammar = check_empty_rules(mathexpr_grammar)%top show_grammar(ne_mathexpr_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(ne_mathexpr_grammar), start_symbol='<START>')for i in range(10): print(repr(gf.fuzz()))# )]We now need to generalize the loops. The idea is to look for patterns exclusively in the similarly named while loops using any of the regular expression learners. For the prototype, we replaced the modified Sequitur with the modified Fernau which gave us better regular expressions than before. The main constraint we have is that we want to avoid repeated execution of program if possible. Fernau algorithm can recover a reasonably approximate regular exression based only on positive data.We now need to generalize the loops. The idea is to look for patterns exclusively in the similarly named while loops using any of the regular expression learners. For the prototype, we replaced the modified Sequitur with the modified Fernau which gave us better regular expressions than before. The main constraint we have is that we want to avoid repeated execution of program if possible. Fernau algorithm can recover a reasonably approximate regular exression based only on positive data.
#### The modified Fernau algorithmThe Fernau algorithm is from _Algorithms for learning regular expressions from positive data_ by _HenningFernau_. Our algorithm uses a modified form of the Prefix-Tree-Acceptor from Fernau. First we define an LRF buffer of a given size.The Fernau algorithm is from Algorithms for learning regular expressions from positive data by HenningFernau. Our algorithm uses a modified form of the Prefix-Tree-Acceptor from Fernau. First we define an LRF buffer of a given size.
xxxxxxxxxximport jsonclass Buf: def __init__(self, size): self.size = size self.items = [None] * self.sizeThe `add1()` takes in an array, and transfers the first element of the array into the end of current buffer, and simultaneously drops the first element of the buffer.The add1() takes in an array, and transfers the first element of the array into the end of current buffer, and simultaneously drops the first element of the buffer.
xxxxxxxxxxclass Buf(Buf): def add1(self, items): self.items.append(items.pop(0)) return self.items.pop(0)For equality between the buffer and an array, we only compare when both the array and the items are actually elements and not chunked arrays.For equality between the buffer and an array, we only compare when both the array and the items are actually elements and not chunked arrays.
xxxxxxxxxxclass Buf(Buf): def __eq__(self, items): if any(isinstance(i, dict) for i in self.items): return False if any(isinstance(i, dict) for i in items): return False return items == self.itemsThe `detect_chunks()` detects any repeating portions of a list of `n` size.The detect_chunks() detects any repeating portions of a list of n size.
xxxxxxxxxxdef detect_chunks(n, lst_): lst = list(lst_) chunks = set() last = Buf(n) # check if the next_n elements are repeated. for _ in range(len(lst) - n): lnext_n = lst[0:n] if last == lnext_n: # found a repetition. chunks.add(tuple(last.items)) else: pass last.add1(lst) return chunksOnce we have detected plausible repeating sequences, we gather all similar sequences into arrays.Once we have detected plausible repeating sequences, we gather all similar sequences into arrays.
xxxxxxxxxxdef chunkify(lst_,n , chunks): lst = list(lst_) chunked_lst = [] while len(lst) >= n: lnext_n = lst[0:n] if (not any(isinstance(i, dict) for i in lnext_n)) and tuple(lnext_n) in chunks: chunked_lst.append({'_':lnext_n}) lst = lst[n:] else: chunked_lst.append(lst.pop(0)) chunked_lst.extend(lst) return chunked_lstThe `identify_chunks()` simply calls the `detect_chunks()` on all given lists, and then converts all chunks identified into arrays.The identify_chunks() simply calls the detect_chunks() on all given lists, and then converts all chunks identified into arrays.
xxxxxxxxxxdef identify_chunks(my_lsts): # initialize all_chunks = {} maximum = max(len(lst) for lst in my_lsts) for i in range(1, maximum//2+1): all_chunks[i] = set() # First, identify chunks in each list. for lst in my_lsts: for i in range(1,maximum//2+1): chunks = detect_chunks(i, lst) all_chunks[i] |= chunks # Then, chunkify new_lsts = [] for lst in my_lsts: for i in range(1,maximum//2+1): chunks = all_chunks[i] lst = chunkify(lst, i, chunks) new_lsts.append(lst) return new_lsts##### Prefix tree acceptorThe prefix tree acceptor is a way to represent positive data. The `Node` class holds a single node in the prefix tree acceptor.The prefix tree acceptor is a way to represent positive data. The Node class holds a single node in the prefix tree acceptor.
xxxxxxxxxxclass Node: # Each tree node gets its unique id. _uid = 0 def __init__(self, item): # self.repeats = False self.count = 1 # how many repetitions. self.counters = set() self.last = False self.children = [] self.item = item self.uid = Node._uid Node._uid += 1 def update_counters(self): self.counters.add(self.count) self.count = 0 for c in self.children: c.update_counters() def __repr__(self): return str(self.to_json()) def __str__(self): return str("(%s, [%s])", (self.item, ' '.join([str(i) for i in self.children]))) def to_json(self): s = ("(%s)" % ' '.join(self.item['_'])) if isinstance(self.item, dict) else str(self.item) return (s, tuple(self.counters), [i.to_json() for i in self.children]) def inc_count(self): self.count += 1 def add_ref(self): self.count = 1 def get_child(self, c): for i in self.children: if i.item == c: return i return None def add_child(self, c): # first check if it is the current node. If it is, increment # count, and return ourselves. if c == self.item: self.inc_count() return self else: # check if it is one of the children. If it is a child, then # preserve its original count. nc = self.get_child(c) if nc is None: nc = Node(c) self.children.append(nc) else: nc.add_ref() return ncxxxxxxxxxxdef update_tree(lst_, root): lst = list(lst_) branch = root while lst: first, *lst = lst branch = branch.add_child(first) branch.last = True return rootdef create_tree_with_lsts(lsts): Node._uid = 0 root = Node(None) for lst in lsts: root.count = 1 # there is at least one element. update_tree(lst, root) root.update_counters() return rootdef get_star(node, key): if node.item is None: return '' if isinstance(node.item, dict): # take care of counters elements = node.item['_'] my_key = "<%s-%d-s>" % (key, node.uid) alts = [elements] if len(node.counters) > 1: # repetition alts.append(elements + [my_key]) return [my_key], {my_key:alts} else: return [str(node.item)], {}def node_to_grammar(node, grammar, key): rule = [] alts = [rule] if node.uid == 0: my_key = "<%s>" % key else: my_key = "<%s-%d>" % (key, node.uid) grammar[my_key] = alts if node.item is not None: mk, g = get_star(node, key) rule.extend(mk) grammar.update(g) # is the node last? if node.last: assert node.item is not None # add a duplicate rule that ends here. ending_rule = list(rule) # if there are no children, the current rule is # any way ending. if node.children: alts.append(ending_rule) if node.children: if len(node.children) > 1: my_ckey = "<%s-%d-c>" % (key, node.uid) rule.append(my_ckey) grammar[my_ckey] = [ ["<%s-%d>" % (key, c.uid)] for c in node.children] else: my_ckey = "<%s-%d>" % (key, node.children[0].uid) rule.append(my_ckey) else: pass for c in node.children: node_to_grammar(c, grammar, key) return grammardef generate_grammar(lists, key): lsts = identify_chunks(lists) tree = create_tree_with_lsts(lsts) grammar = {} node_to_grammar(tree, grammar, key) return grammarGiven a rule, determine the abstraction for it.Given a rule, determine the abstraction for it.
xxxxxxxxxxdef collapse_alts(rules, k): ss = [[str(r) for r in rule] for rule in rules] x = generate_grammar(ss, k[1:-1]) return xxxxxxxxxxxdef collapse_rules(grammar): r_grammar = {} for k in grammar: new_grammar = collapse_alts(grammar[k], k) # merge the new_grammar with r_grammar # we know none of the keys exist in r_grammar because # new keys are k prefixed. for k_ in new_grammar: r_grammar[k_] = new_grammar[k_] return r_grammarxxxxxxxxxx%top collapsed_calc_grammar = collapse_rules(ne_calc_grammar)%top show_grammar(collapsed_calc_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(ne_mathexpr_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxx%top collapsed_mathexpr_grammar = collapse_rules(ne_mathexpr_grammar)%top show_grammar(collapsed_mathexpr_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(collapsed_mathexpr_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(collapsed_calc_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxxdef convert_spaces(grammar): keys = {key: key.replace(' ', '_') for key in grammar} new_grammar = {} for key in grammar: new_alt = [] for rule in grammar[key]: new_rule = [] for t in rule: for k in keys: t = t.replace(k, keys[k]) new_rule.append(t) new_alt.append(''.join(new_rule)) new_grammar[keys[key]] = new_alt return new_grammarxxxxxxxxxx%top calc_grammar = convert_spaces(collapsed_calc_grammar)%top show_grammar(calc_grammar, canonical=False)xxxxxxxxxxfrom fuzzingbook import GrammarFuzzer, GrammarMiner, Parserxxxxxxxxxx%top gf = GrammarFuzzer.GrammarFuzzer(calc_grammar, start_symbol='<START>')xxxxxxxxxx%%top# [(for i in range(10): print(gf.fuzz())# )]xxxxxxxxxxdef first_in_chain(token, chain): while True: if token in chain: token = chain[token] else: break return tokenReturn a new symbol for `grammar` based on `symbol_name`.Return a new symbol for grammar based on symbol_name.
xxxxxxxxxxdef new_symbol(grammar, symbol_name="<symbol>"): if symbol_name not in grammar: return symbol_name count = 1 while True: tentative_symbol_name = symbol_name[:-1] + "-" + repr(count) + ">" if tentative_symbol_name not in grammar: return tentative_symbol_name count += 1Replace keys that have a single token definition with the token in the defition.Replace keys that have a single token definition with the token in the defition.
xxxxxxxxxxdef replacement_candidates(grammar): to_replace = {} for k in grammar: if len(grammar[k]) != 1: continue if k in {'<START>', '<main>'}: continue rule = grammar[k][0] res = re.findall(RE_NONTERMINAL, rule) if len(res) == 1: if len(res[0]) != len(rule): continue to_replace[k] = first_in_chain(res[0], to_replace) elif len(res) == 0: to_replace[k] = first_in_chain(rule, to_replace) else: continue # more than one. return to_replacexxxxxxxxxxdef replace_key_by_new_key(grammar, keys_to_replace): new_grammar = {} for key in grammar: new_rules = [] for rule in grammar[key]: for k in keys_to_replace: new_key = keys_to_replace[k] rule = rule.replace(k, keys_to_replace[k]) new_rules.append(rule) new_grammar[keys_to_replace.get(key, key)] = new_rules assert len(grammar) == len(new_grammar) return new_grammarxxxxxxxxxxdef replace_key_by_key(grammar, keys_to_replace): new_grammar = {} for key in grammar: if key in keys_to_replace: continue new_rules = [] for rule in grammar[key]: for k in keys_to_replace: new_key = keys_to_replace[k] rule = rule.replace(k, keys_to_replace[k]) new_rules.append(rule) new_grammar[key] = new_rules return new_grammarxxxxxxxxxxdef remove_single_entries(grammar): keys_to_replace = replacement_candidates(grammar) return replace_key_by_key(grammar, keys_to_replace)Remove keys that have similar rules.Remove keys that have similar rules.
xxxxxxxxxxdef collect_duplicate_rule_keys(grammar): collect = {} for k in grammar: salt = str(sorted(grammar[k])) if salt not in collect: collect[salt] = (k, set()) else: collect[salt][1].add(k) return collectxxxxxxxxxxdef remove_duplicate_rule_keys(grammar): g = grammar while True: collect = collect_duplicate_rule_keys(g) keys_to_replace = {} for salt in collect: k, st = collect[salt] for s in st: keys_to_replace[s] = k if not keys_to_replace: break g = replace_key_by_key(g, keys_to_replace) return gRemove all the control flow vestiges from names, and simply name them sequentially.Remove all the control flow vestiges from names, and simply name them sequentially.
xxxxxxxxxxdef collect_replacement_keys(grammar): g = copy.deepcopy(grammar) to_replace = {} for k in grammar: if ':' in k: first, rest = k.split(':') sym = new_symbol(g, symbol_name=first + '>') assert sym not in g g[sym] = None to_replace[k] = sym else: continue return to_replacexxxxxxxxxxdef cleanup_tokens(grammar): keys_to_replace = collect_replacement_keys(grammar) g = replace_key_by_new_key(grammar, keys_to_replace) return gxxxxxxxxxxdef replaceAngular(grammar): new_g = {} replaced = False for k in grammar: new_rules = [] for rule in grammar[k]: new_rule = rule.replace('<>', '<openA><closeA>').replace('</>', '<openA>/<closeA>') if rule != new_rule: replaced = True new_rules.append(new_rule) new_g[k] = new_rules if replaced: new_g['<openA>'] = ['<'] new_g['<closeA>'] = ['<'] return new_gRemove keys that are referred to only from a single rule, and which have a single alternative.Import. This can't work on canonical representation. First, given a key, we figure out its distance to `<START>`.This is different from `remove_single_entries()` in that, there we do not care if the key is being used multiple times. Here, we only replace keys that are referred to only once.Remove keys that are referred to only from a single rule, and which have a single alternative.
Import. This can't work on canonical representation. First, given a key, we figure out its distance to <START>.
This is different from remove_single_entries() in that, there we do not care if the key is being used multiple times. Here, we only replace keys that are referred to only once.
xxxxxxxxxximport mathxxxxxxxxxxdef len_to_start(item, parents, seen=None): if seen is None: seen = set() if item in seen: return math.inf seen.add(item) if item == '<START>': return 0 else: return 1 + min(len_to_start(p, parents, seen) for p in parents[item])xxxxxxxxxxdef order_by_length_to_start(items, parents): return sorted(items, key=lambda i: len_to_start(i, parents))Next, we generate a map of `child -> [parents]`.Next, we generate a map of child -> [parents].
xxxxxxxxxxdef id_parents(grammar, key, seen=None, parents=None): if parents is None: parents = {} seen = set() if key in seen: return seen.add(key) for rule in grammar[key]: res = re.findall(RE_NONTERMINAL, rule) for token in res: if token.startswith('<') and token.endswith('>'): if token not in parents: parents[token] = list() parents[token].append(key) for ckey in {i for i in grammar if i not in seen}: id_parents(grammar, ckey, seen, parents) return parentsNow, all together.Now, all together.
xxxxxxxxxxdef remove_single_alts(grammar, start_symbol='<START>'): single_alts = {p for p in grammar if len(grammar[p]) == 1 and p != start_symbol} child_parent_map = id_parents(grammar, start_symbol) single_refs = {p:child_parent_map[p] for p in single_alts if len(child_parent_map[p]) <= 1} keys_to_replace = {p:grammar[p][0] for p in order_by_length_to_start(single_refs, child_parent_map)} g = replace_key_by_key(grammar, keys_to_replace) return gxxxxxxxxxximport osimport hashlibxxxxxxxxxxdef accio_grammar(fname, src, samples, cache=True): hash_id = hashlib.md5(json.dumps(samples).encode()).hexdigest() cache_file = "build/%s_%s_generalized_tree.json" % (fname, hash_id) if os.path.exists(cache_file) and cache: with open(cache_file) as f: generalized_tree = json.load(f) else: # regenerate the program program_src[fname] = src with open('subjects/%s' % fname, 'w+') as f: print(src, file=f) resrc = rewrite(src, fname) with open('build/%s' % fname, 'w+') as f: print(resrc, file=f) os.makedirs('samples/%s' % fname, exist_ok=True) sample_files = {("samples/%s/%d.csv"%(fname,i)):s for i,s in enumerate(samples)} for k in sample_files: with open(k, 'w+') as f: print(sample_files[k], file=f) call_trace = [] for i in sample_files: thash_id = hashlib.md5(json.dumps(sample_files[i]).encode()).hexdigest() trace_cache_file = "build/%s_%s_trace.json" % (fname, thash_id) if os.path.exists(trace_cache_file) and cache: with open(trace_cache_file) as f: my_tree = f.read() else: my_tree = do(["python", "./build/%s" % fname, i]).stdout with open(trace_cache_file, 'w+') as f: print(my_tree, file=f) call_trace.append(json.loads(my_tree)[0]) mined_tree = miner(call_trace) generalized_tree = generalize_iter(mined_tree) # costly data structure. with open(cache_file, 'w+') as f: json.dump(generalized_tree, f) g = convert_to_grammar(generalized_tree) with open('build/%s_grammar_1.json' % fname, 'w+') as f: json.dump(g, f) g = check_empty_rules(g) with open('build/%s_grammar_2.json' % fname, 'w+') as f: json.dump(g, f) g = collapse_rules(g) # <- regex learner with open('build/%s_grammar_3.json' % fname, 'w+') as f: json.dump(g, f) g = convert_spaces(g) with open('build/%s_grammar_4.json' % fname, 'w+') as f: json.dump(g, f) e = remove_single_alts(cleanup_tokens(remove_duplicate_rule_keys(remove_single_entries(g)))) e = show_grammar(e, canonical=False) with open('build/%s_grammar.json' % fname, 'w+') as f: json.dump(e, f) return exxxxxxxxxx%top calc_grammar = accio_grammar('calculator.py', VARS['calc_src'], ['(1+2)-2', '11'])xxxxxxxxxx%top calc_grammarxxxxxxxxxx%top gf = GrammarFuzzer.GrammarFuzzer(calc_grammar, start_symbol='<START>')xxxxxxxxxx%%top# [(for i in range(10): print(gf.fuzz())# )]## LibrariesWe need a few instrumented supporting libraries.We need a few instrumented supporting libraries.
xxxxxxxxxx%%var myio_src# [(r"""File-like objects that read from or write to a string buffer.This implements (nearly) all stdio methods.f = StringIO() # ready for writingf = StringIO(buf) # ready for readingf.close() # explicitly release resources heldflag = f.isatty() # always falsepos = f.tell() # get current positionf.seek(pos) # set current positionf.seek(pos, mode) # mode 0: absolute; 1: relative; 2: relative to EOFbuf = f.read() # read until EOFbuf = f.read(n) # read up to n bytesbuf = f.readline() # read until end of line ('\n') or EOFlist = f.readlines()# list of f.readline() results until EOFf.truncate([size]) # truncate file at to at most size (default: current pos)f.write(buf) # write at current positionf.writelines(list) # for line in list: f.write(line)f.getvalue() # return whole file's contents as a stringNotes:- Using a real file is often faster (but less convenient).- There's also a much faster implementation in C, called cStringIO, but it's not subclassable.- fileno() is left unimplemented so that code which uses it triggers an exception early.- Seeking far beyond EOF and then writing will insert real null bytes that occupy space in the buffer.- There's a simple test set (see end of this file)."""try: from errno import EINVALexcept ImportError: EINVAL = 22__all__ = ["StringIO"]def _complain_ifclosed(closed): if closed: raise ValueError("I/O operation on closed file")class StringIO: """class StringIO([buffer]) When a StringIO object is created, it can be initialized to an existing string by passing the string to the constructor. If no string is given, the StringIO will start empty. The StringIO object can accept either Unicode or 8-bit strings, but mixing the two may take some care. If both are used, 8-bit strings that cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause a UnicodeError to be raised when getvalue() is called. """ def __init__(self, buf = ''): # Force self.buf to be a string or unicode if not isinstance(buf, str): buf = str(buf) self.buf = buf self.len = len(buf) self.buflist = [] self.pos = 0 self.closed = False self.softspace = 0 def __iter__(self): return self def __next__(self): """A file object is its own iterator, for example iter(f) returns f (unless f is closed). When a file is used as an iterator, typically in a for loop (for example, for line in f: print line), the next() method is called repeatedly. This method returns the next input line, or raises StopIteration when EOF is hit. """ _complain_ifclosed(self.closed) r = self.readline() if not r: raise StopIteration return r def close(self): """Free the memory buffer. """ if not self.closed: self.closed = True del self.buf, self.pos def isatty(self): """Returns False because StringIO objects are not connected to a tty-like device. """ _complain_ifclosed(self.closed) return False def seek(self, pos, mode = 0): """Set the file's current position. The mode argument is optional and defaults to 0 (absolute file positioning); other values are 1 (seek relative to the current position) and 2 (seek relative to the file's end). There is no return value. """ _complain_ifclosed(self.closed) if self.buflist: self.buf += ''.join(self.buflist) self.buflist = [] if mode == 1: pos += self.pos elif mode == 2: pos += self.len self.pos = max(0, pos) def tell(self): """Return the file's current position.""" _complain_ifclosed(self.closed) return self.pos def read(self, n = -1): """Read at most size bytes from the file (less if the read hits EOF before obtaining size bytes). If the size argument is negative or omitted, read all data until EOF is reached. The bytes are returned as a string object. An empty string is returned when EOF is encountered immediately. """ _complain_ifclosed(self.closed) if self.buflist: self.buf += ''.join(self.buflist) self.buflist = [] if n is None or n < 0: newpos = self.len else: newpos = min(self.pos+n, self.len) r = self.buf[self.pos:newpos] self.pos = newpos return r def readline(self, length=None): r"""Read one entire line from the file. A trailing newline character is kept in the string (but may be absent when a file ends with an incomplete line). If the size argument is present and non-negative, it is a maximum byte count (including the trailing newline) and an incomplete line may be returned. An empty string is returned only when EOF is encountered immediately. Note: Unlike stdio's fgets(), the returned string contains null characters ('\0') if they occurred in the input. """ _complain_ifclosed(self.closed) if self.buflist: self.buf += ''.join(self.buflist) self.buflist = [] i = self.buf.find('\n', self.pos) if i < 0: newpos = self.len else: newpos = i+1 if length is not None and length > 0: if self.pos + length < newpos: newpos = self.pos + length r = self.buf[self.pos:newpos] self.pos = newpos return r def readlines(self, sizehint = 0): """Read until EOF using readline() and return a list containing the lines thus read. If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (or more to accommodate a final whole line). """ total = 0 lines = [] line = self.readline() while line: lines.append(line) total += len(line) if 0 < sizehint <= total: break line = self.readline() return lines def truncate(self, size=None): """Truncate the file's size. If the optional size argument is present, the file is truncated to (at most) that size. The size defaults to the current position. The current file position is not changed unless the position is beyond the new file size. If the specified size exceeds the file's current size, the file remains unchanged. """ _complain_ifclosed(self.closed) if size is None: size = self.pos elif size < 0: raise IOError(EINVAL, "Negative size not allowed") elif size < self.pos: self.pos = size self.buf = self.getvalue()[:size] self.len = size def write(self, s): """Write a string to the file. There is no return value. """ _complain_ifclosed(self.closed) if not s: return # Force s to be a string or unicode if not isinstance(s, str): s = str(s) spos = self.pos slen = self.len if spos == slen: self.buflist.append(s) self.len = self.pos = spos + len(s) return if spos > slen: self.buflist.append('\0'*(spos - slen)) slen = spos newpos = spos + len(s) if spos < slen: if self.buflist: self.buf += ''.join(self.buflist) self.buflist = [self.buf[:spos], s, self.buf[newpos:]] self.buf = '' if newpos > slen: slen = newpos else: self.buflist.append(s) slen = newpos self.len = slen self.pos = newpos def writelines(self, iterable): """Write a sequence of strings to the file. The sequence can be any iterable object producing strings, typically a list of strings. There is no return value. (The name is intended to match readlines(); writelines() does not add line separators.) """ write = self.write for line in iterable: write(line) def flush(self): """Flush the internal buffer """ _complain_ifclosed(self.closed) def getvalue(self): """ Retrieve the entire contents of the "file" at any time before the StringIO object's close() method is called. The StringIO object can accept either Unicode or 8-bit strings, but mixing the two may take some care. If both are used, 8-bit strings that cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause a UnicodeError to be raised when getvalue() is called. """ if self.buflist: self.buf += ''.join(self.buflist) self.buflist = [] return self.buf# )]xxxxxxxxxx# [(with open('build/myio.py', 'w+') as f: print(VARS['myio_src'], file=f)# )]xxxxxxxxxx%%var mylex_src# [("""A lexical analyzer class for simple shell-like syntaxes."""# Module and documentation by Eric S. Raymond, 21 Dec 1998# Input stacking and error message cleanup added by ESR, March 2000# push_source() and pop_source() made explicit by ESR, January 2001.# Posix compliance, split(), string arguments, and# iterator interface by Gustavo Niemeyer, April 2003.# changes to tokenize more like Posix shells by Vinay Sajip, July 2016.import osimport reimport sysfrom collections import dequefrom myio import StringIO__all__ = ["shlex", "split", "quote"]class shlex: "A lexical analyzer class for simple shell-like syntaxes." def __init__(self, instream=None, infile=None, posix=False, punctuation_chars=False): if isinstance(instream, str): instream = StringIO(instream) if instream is not None: self.instream = instream self.infile = infile else: self.instream = sys.stdin self.infile = None self.posix = posix if posix: self.eof = None else: self.eof = '' self.commenters = '#' self.wordchars = ('abcdfeghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_') if self.posix: self.wordchars += ('ßàáâãäåæçèéêëìíîïðñòóôõöøùúûüýþÿ' 'ÀÁÂÃÄÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖØÙÚÛÜÝÞ') self.whitespace = ' \t\r\n' self.whitespace_split = False self.quotes = '\'"' self.escape = '\\' self.escapedquotes = '"' self.state = ' ' self.pushback = deque() self.lineno = 1 self.debug = 0 self.token = '' self.filestack = deque() self.source = None if not punctuation_chars: punctuation_chars = '' elif punctuation_chars is True: punctuation_chars = '();<>|&' self.punctuation_chars = punctuation_chars if punctuation_chars: # _pushback_chars is a push back queue used by lookahead logic self._pushback_chars = deque() # these chars added because allowed in file names, args, wildcards self.wordchars += '~-./*?=' #remove any punctuation chars from wordchars t = self.wordchars.maketrans(dict.fromkeys(punctuation_chars)) self.wordchars = self.wordchars.translate(t) def push_token(self, tok): "Push a token onto the stack popped by the get_token method" if self.debug >= 1: print("shlex: pushing token " + repr(tok)) self.pushback.appendleft(tok) def push_source(self, newstream, newfile=None): "Push an input source onto the lexer's input source stack." if isinstance(newstream, str): newstream = StringIO(newstream) self.filestack.appendleft((self.infile, self.instream, self.lineno)) self.infile = newfile self.instream = newstream self.lineno = 1 if self.debug: if newfile is not None: print('shlex: pushing to file %s' % (self.infile,)) else: print('shlex: pushing to stream %s' % (self.instream,)) def pop_source(self): "Pop the input source stack." self.instream.close() (self.infile, self.instream, self.lineno) = self.filestack.popleft() if self.debug: print('shlex: popping to %s, line %d' \ % (self.instream, self.lineno)) self.state = ' ' def get_token(self): "Get a token from the input stream (or from stack if it's nonempty)" if self.pushback: tok = self.pushback.popleft() if self.debug >= 1: print("shlex: popping token " + repr(tok)) return tok # No pushback. Get a token. raw = self.read_token() # Handle inclusions if self.source is not None: while raw == self.source: spec = self.sourcehook(self.read_token()) if spec: (newfile, newstream) = spec self.push_source(newstream, newfile) raw = self.get_token() # Maybe we got EOF instead? while raw == self.eof: if not self.filestack: return self.eof else: self.pop_source() raw = self.get_token() # Neither inclusion nor EOF if self.debug >= 1: if raw != self.eof: print("shlex: token=" + repr(raw)) else: print("shlex: token=EOF") return raw def read_token(self): quoted = False escapedstate = ' ' while True: if self.punctuation_chars and self._pushback_chars: nextchar = self._pushback_chars.pop() else: nextchar = self.instream.read(1) if nextchar == '\n': self.lineno += 1 if self.debug >= 3: print("shlex: in state %r I see character: %r" % (self.state, nextchar)) if self.state is None: self.token = '' # past end of file break elif self.state == ' ': if not nextchar: self.state = None # end of file break elif nextchar in self.whitespace: if self.debug >= 2: print("shlex: I see whitespace in whitespace state") if self.token or (self.posix and quoted): break # emit current token else: continue elif nextchar in self.commenters: self.instream.readline() self.lineno += 1 elif self.posix and nextchar in self.escape: escapedstate = 'a' self.state = nextchar elif nextchar in self.wordchars: self.token = nextchar self.state = 'a' elif nextchar in self.punctuation_chars: self.token = nextchar self.state = 'c' elif nextchar in self.quotes: if not self.posix: self.token = nextchar self.state = nextchar elif self.whitespace_split: self.token = nextchar self.state = 'a' else: self.token = nextchar if self.token or (self.posix and quoted): break # emit current token else: continue elif self.state in self.quotes: quoted = True if not nextchar: # end of file if self.debug >= 2: print("shlex: I see EOF in quotes state") # XXX what error should be raised here? raise ValueError("No closing quotation") if nextchar == self.state: if not self.posix: self.token += nextchar self.state = ' ' break else: self.state = 'a' elif (self.posix and nextchar in self.escape and self.state in self.escapedquotes): escapedstate = self.state self.state = nextchar else: self.token += nextchar elif self.state in self.escape: if not nextchar: # end of file if self.debug >= 2: print("shlex: I see EOF in escape state") # XXX what error should be raised here? raise ValueError("No escaped character") # In posix shells, only the quote itself or the escape # character may be escaped within quotes. if (escapedstate in self.quotes and nextchar != self.state and nextchar != escapedstate): self.token += self.state self.token += nextchar self.state = escapedstate elif self.state in ('a', 'c'): if not nextchar: self.state = None # end of file break elif nextchar in self.whitespace: if self.debug >= 2: print("shlex: I see whitespace in word state") self.state = ' ' if self.token or (self.posix and quoted): break # emit current token else: continue elif nextchar in self.commenters: self.instream.readline() self.lineno += 1 if self.posix: self.state = ' ' if self.token or (self.posix and quoted): break # emit current token else: continue elif self.state == 'c': if nextchar in self.punctuation_chars: self.token += nextchar else: if nextchar not in self.whitespace: self._pushback_chars.append(nextchar) self.state = ' ' break elif self.posix and nextchar in self.quotes: self.state = nextchar elif self.posix and nextchar in self.escape: escapedstate = 'a' self.state = nextchar elif (nextchar in self.wordchars or nextchar in self.quotes or self.whitespace_split): self.token += nextchar else: if self.punctuation_chars: self._pushback_chars.append(nextchar) else: self.pushback.appendleft(nextchar) if self.debug >= 2: print("shlex: I see punctuation in word state") self.state = ' ' if self.token or (self.posix and quoted): break # emit current token else: continue result = self.token self.token = '' if self.posix and not quoted and result == '': result = None if self.debug > 1: if result: print("shlex: raw token=" + repr(result)) else: print("shlex: raw token=EOF") return result def sourcehook(self, newfile): "Hook called on a filename to be sourced." if newfile[0] == '"': newfile = newfile[1:-1] # This implements cpp-like semantics for relative-path inclusion. if isinstance(self.infile, str) and not os.path.isabs(newfile): newfile = os.path.join(os.path.dirname(self.infile), newfile) return (newfile, open(newfile, "r")) def error_leader(self, infile=None, lineno=None): "Emit a C-compiler-like, Emacs-friendly error-message leader." if infile is None: infile = self.infile if lineno is None: lineno = self.lineno return "\"%s\", line %d: " % (infile, lineno) def __iter__(self): return self def __next__(self): token = self.get_token() if token == self.eof: raise StopIteration return tokendef split(s, comments=False, posix=True): lex = shlex(s, posix=posix) lex.whitespace_split = True if not comments: lex.commenters = '' return list(lex)_find_unsafe = re.compile(r'[^\w@%+=:,./-]', re.ASCII).searchdef quote(s): """Return a shell-escaped version of the string *s*.""" if not s: return "''" if _find_unsafe(s) is None: return s # use single quotes, and put single quotes into double quotes # the string $'b is then quoted as '$'"'"'b' return "'" + s.replace("'", "'\"'\"'") + "'"def _print_tokens(lexer): while 1: tt = lexer.get_token() if not tt: break print("Token: " + repr(tt))# )]xxxxxxxxxx# [(with open('build/mylex.py', 'w+') as f: print(VARS['mylex_src'], file=f)# )]xxxxxxxxxximport fuzzingbookxxxxxxxxxxassert os.path.isfile('json.tar.gz') # for microjson validationxxxxxxxxxxMax_Precision = 1000Max_Recall = 1000Autogram = {}AutogramFuzz = {}AutogramGrammar = {}Mimid = {}MimidFuzz = {}MimidGrammar = {}MaxTimeout = 60*60 # 60 minutesMaxParseTimeout = 60*5CHECK = {'cgidecode','calculator', 'mathexpr', 'urlparse', 'netrc', 'microjson'}reset_generalizer()xxxxxxxxxxdef recover_grammar_with_taints(name, src, samples): header = '''import fuzzingbook.GrammarMinerfrom fuzzingbook.GrammarMiner import Tracerfrom fuzzingbook.InformationFlow import ostrfrom fuzzingbook.GrammarMiner import TaintedScopedGrammarMiner as TSGMfrom fuzzingbook.GrammarMiner import readableimport subjects.autogram_%simport fuzzingbookclass ostr_new(ostr): def __new__(cls, value, *args, **kw): return str.__new__(cls, value) def __init__(self, value, taint=None, origin=None, **kwargs): self.taint = taint if origin is None: origin = ostr.DEFAULT_ORIGIN if isinstance(origin, int): self.origin = list(range(origin, origin + len(self))) else: self.origin = origin #assert len(self.origin) == len(self) <-- bug fix here.class ostr_new(ostr_new): def create(self, res, origin=None): return ostr_new(res, taint=self.taint, origin=origin) def __repr__(self): # bugfix here. return str.__repr__(self) def recover_grammar_with_taints(fn, inputs, **kwargs): miner = TSGM() for inputstr in inputs: with Tracer(ostr_new(inputstr), **kwargs) as tracer: fn(tracer.my_input) miner.update_grammar(tracer.my_input, tracer.trace) return readable(miner.clean_grammar())def replaceAngular(grammar): # special handling for Autogram because it does not look for <> and </> # in rules, which messes up with parsing. new_g = {} replaced = False for k in grammar: new_rules = [] for rule in grammar[k]: new_rule = rule.replace('<>', '<openA><closeA>').replace('</>', '<openA>/<closeA>').replace('<lambda>','<openA>lambda<closeA>') if rule != new_rule: replaced = True new_rules.append(new_rule) new_g[k] = new_rules if replaced: new_g['<openA>'] = ['<'] new_g['<closeA>'] = ['<'] return new_gdef replace_start(grammar): assert '<start>' in grammar start = grammar['<start>'] del grammar['<start>'] grammar['<START>'] = start return replaceAngular(grammar) samples = [i.strip() for i in [%s] if i.strip()]import jsonautogram_grammar_t = recover_grammar_with_taints(subjects.autogram_%s.main, samples)print(json.dumps(replace_start(autogram_grammar_t)))''' mod_name = name.replace('.py','') with open('./subjects/autogram_%s' % name, 'w+') as f: print(src, file=f) with open('./build/autogram_%s' % name, 'w+') as f: print(header % (mod_name, ',\n'.join([repr(i) for i in samples]), mod_name), file=f) with ExpectTimeout(MaxTimeout): result = do(["python","./build/autogram_%s" % name], env={'PYTHONPATH':'.'}, log=True) if result.stderr.strip(): print(result.stderr, file=sys.stderr) return show_grammar(json.loads(result.stdout), canonical=False) return {}xxxxxxxxxxfrom fuzzingbook.Parser import IterativeEarleyParser### Check RecallHow many of the *valid* inputs from the golden grammar can be recognized by a parser using our grammar?How many of the valid inputs from the golden grammar can be recognized by a parser using our grammar?
xxxxxxxxxxdef check_recall(golden_grammar, my_grammar, validator, maximum=Max_Recall, log=False): my_count = maximum ie = IterativeEarleyParser(my_grammar, start_symbol='<START>') golden = GrammarFuzzer.GrammarFuzzer(golden_grammar, start_symbol='<START>') success = 0 while my_count != 0: src = golden.fuzz() try: validator(src) my_count -= 1 try: #print('?', repr(src), file=sys.stderr) for tree in ie.parse(src): success += 1 break if log: print(maximum - my_count, '+', repr(src), success, file=sys.stderr) except: #print("Error:", sys.exc_info()[0], file=sys.stderr) if log: print(maximum - my_count, '-', repr(src), file=sys.stderr) pass except: pass return (success, maximum)### Check PrecisionHow many of the inputs produced using our grammar are valid? (Accepted by the program).How many of the inputs produced using our grammar are valid? (Accepted by the program).
xxxxxxxxxxdef check_precision(name, grammar, maximum=Max_Precision, log=False): success = 0 with ExpectError(): fuzzer = GrammarFuzzer.GrammarFuzzer(grammar, start_symbol='<START>') for i in range(maximum): v = fuzzer.fuzz() c = check(v, name) success += (1 if c else 0) if log: print(i, repr(v), c) return (success, maximum)xxxxxxxxxxfrom datetime import datetimexxxxxxxxxxclass timeit(): def __enter__(self): self.tic = datetime.now() return self def __exit__(self, *args, **kwargs): self.delta = datetime.now() - self.tic self.runtime = (self.delta.microseconds, self.delta)xxxxxxxxxxfrom fuzzingbook.ExpectError import ExpectError, ExpectTimeoutxxxxxxxxxxfrom fuzzingbook.Parser import IterativeEarleyParserxxxxxxxxxxdef process(s): # see the rewrite fn. We remove newlines from grammar training to make it easier to visualize return s.strip().replace('\n', ' ')xxxxxxxxxxdef check_parse(grammar, inputstrs, start_symbol='<START>'): count = 0 e = IterativeEarleyParser(grammar, start_symbol=start_symbol) for s in inputstrs: with ExpectError(): with ExpectTimeout(MaxParseTimeout): for tree in e.parse(process(s)): count += 1 break return (count, len(inputstrs))xxxxxxxxxxfrom fuzzingbook.ExpectError import ExpectError, ExpectTimeoutxxxxxxxxxxfrom fuzzingbook import GrammarFuzzer, Parserxxxxxxxxxxdef save_grammar(grammar, tool, program): with open("build/%s-%s.grammar.json" % (tool, program), 'w+') as f: json.dump(grammar, f) return {k:sorted(grammar[k]) for k in grammar}xxxxxxxxxximport stringxxxxxxxxxxMimid_p = {}Mimid_r = {}Autogram_p = {}Autogram_r = {}Mimid_t ={}Autogram_t ={}for k in program_src: Mimid_p[k] = None Mimid_r[k] = None Mimid_t[k] = None Autogram_p[k] = None Autogram_r[k] = None Autogram_t[k] = Nonexxxxxxxxxximport urllib.parsexxxxxxxxxxcgidecode_golden = { "<START>": [ "<cgidecode-s>" ], "<cgidecode-s>": [ '<cgidecode>', '<cgidecode><cgidecode-s>'], "<cgidecode>": [ "<single_char>", "<percentage_char>" ], "<single_char>": list(string.ascii_lowercase + string.ascii_uppercase + string.digits + "-./_~"), "<percentage_char>": [urllib.parse.quote(i) for i in string.punctuation if i not in "-./_~"],}xxxxxxxxxxcgidecode_samples = [ "Hello%2c+world%21", 'Send+mail+to+me%40fuzzingbook.org', 'name=Fred&class=101&math=2+%2B+2+%3D+4&', 'name=Fred&status=good&status=happy&', 'http://target/getdata.php?data=%3cscript%20src=%22http%3a%2f%2f', 'www.badplace.com%2fnasty.js%22%3e%3c%2fscript%3e', 'http://target/login.asp?userid=bob%27%3b%20update%20logintable%20set%20passwd%3d%270wn3d%27%3b--%00', 'Colon%20%3A%20Hash%20%23%20Percent%20%25', 'O%21nP%22BG%23JI%24Tw%25mJ%26bB%27xX%28zy%29Aj%2aZ', 'E%2bNp%2cRP%2dVN%2eyV%2ftW%2AIJ%2BAe%2CkM%2DKf%2EB', 'W%2FAo%3azF%3blw%3ctY%3dqy%3eLm%3fCS%3AyB%3BHq%3Ck', 'y%3DZM%3EVH%3FRx%40gG%5bhh%5cjn%5dOD%5eDR%5fcu%5Bm', 'b%5CJm%5Drl%5Ezn%5FKe%60hQ%7bBj%7chf%7dmB%7eyc%7Bp', 'w%7CWd%7DCG%7Ec', 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ', '1234567890', '-./_~']xxxxxxxxxxwith timeit() as t: cgidecode_grammar = accio_grammar('cgidecode.py', VARS['cgidecode_src'], cgidecode_samples)Mimid_t['cgidecode.py'] = t.runtimexxxxxxxxxxsave_grammar(cgidecode_grammar, 'mimid', 'cgidecode')xxxxxxxxxxif 'cgidecode' in CHECK: result = check_precision('cgidecode.py', cgidecode_grammar) Mimid_p['cgidecode.py'] = result print(result)xxxxxxxxxximport subjects.cgidecodexxxxxxxxxxif 'cgidecode' in CHECK: result = check_recall(cgidecode_golden, cgidecode_grammar, subjects.cgidecode.main) Mimid_r['cgidecode.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_cgidecode_grammar_t = recover_grammar_with_taints('cgidecode.py', VARS['cgidecode_src'], cgidecode_samples)Autogram_t['cgidecode.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_cgidecode_grammar_t, 'autogram_t', 'cgidecode')xxxxxxxxxxif 'cgidecode' in CHECK: result = check_precision('cgidecode.py', autogram_cgidecode_grammar_t) Autogram_p['cgidecode.py'] = result print(result)xxxxxxxxxxif 'cgidecode' in CHECK: result = check_recall(cgidecode_golden, autogram_cgidecode_grammar_t, subjects.cgidecode.main) Autogram_r['cgidecode.py'] = result print(result)xxxxxxxxxxcalc_golden = { "<START>": [ "<expr>" ], "<expr>": [ "<term>+<expr>", "<term>-<expr>", "<term>" ], "<term>": [ "<factor>*<term>", "<factor>/<term>", "<factor>" ], "<factor>": [ "(<expr>)", "<number>" ], "<number>": [ "<integer>.<integer>", "<integer>" ], "<integer>": [ "<digit><integer>", "<digit>" ], "<digit>": [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" ]}xxxxxxxxxxcalc_samples=[i.strip() for i in '''\(1+2)*3/(423-334+9983)-5-((6)-(701))(123+133*(12-3)/9+8)+33(100)21*333/44+210023*234*22*4(123+133*(12-3)/9+8)+331+231/20-2555+(234-445)1-(41/2)443-334+33-222'''.split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: calc_grammar = accio_grammar('calculator.py', VARS['calc_src'], calc_samples)Mimid_t['calculator.py'] = t.runtimexxxxxxxxxxsave_grammar(calc_grammar, 'mimid', 'calculator')xxxxxxxxxxif 'calculator' in CHECK: result = check_precision('calculator.py', calc_grammar) Mimid_p['calculator.py'] = result print(result)xxxxxxxxxximport subjects.calculatorxxxxxxxxxxif 'calculator' in CHECK: result = check_recall(calc_golden, calc_grammar, subjects.calculator.main) Mimid_r['calculator.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_calc_grammar_t = recover_grammar_with_taints('calculator.py', VARS['calc_src'], calc_samples)Autogram_t['calculator.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_calc_grammar_t, 'autogram_t', 'calculator')xxxxxxxxxxif 'calculator' in CHECK: result = check_precision('calculator.py', autogram_calc_grammar_t) Autogram_p['calculator.py'] = result print(result)xxxxxxxxxxif 'calculator' in CHECK: result = check_recall(calc_golden, autogram_calc_grammar_t, subjects.calculator.main) Autogram_r['calculator.py'] = result print(result)xxxxxxxxxxmathexpr_golden = { "<START>": [ "<expr>" ], "<word>": [ "pi", "e", "phi", "abs", "acos", "asin", "atan", "atan2", "ceil", "cos", "cosh", "degrees", "exp", "fabs", "floor", "fmod", "frexp", "hypot", "ldexp", "log", "log10", "modf", "pow", "radians", "sin", "sinh", "sqrt", "tan", "tanh", "<alpha>" ], "<alpha>": [ "a", "b", "c", "d", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"], "<expr>": [ "<term>+<expr>", "<term>-<expr>", "<term>" ], "<term>": [ "<factor>*<term>", "<factor>/<term>", "<factor>" ], "<factor>": [ "+<factor>", "-<factor>", "(<expr>)", "<word>(<expr>,<expr>)", "<word>", "<number>" ], "<number>": [ "<integer>.<integer>", "<integer>" ], "<integer>": [ "<digit><integer>", "<digit>" ], "<digit>": [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" ]}xxxxxxxxxxmathexpr_samples=[i.strip() for i in '''(pi*e+2)*3/(423-334+9983)-5-((6)-(701-x))(123+133*(12-3)/9+8)+33(100)pi * e(1 - 1 + -1) * pi1.0 / 3 * 6(x + e * 10) / 10(a + b) / c1 + pi / 4(1-2)/3.0 + 0.0000-(1 + 2) * 3(1 + 2) * 31001 + 2 * 323*234*22*4(123+133*(12-3)/9+8)+331+231/20-2555+(234-445)1-(41/2)443-334+33-222cos(x+4*3) + 2 * 3exp(0)-(1 + 2) * 3(1-2)/3.0 + 0.0000abs(-2) + pi / 4(pi + e * 10) / 101.0 / 3 * 6cos(pi) * 1atan2(2, 1)hypot(5, 12)pow(3, 5)'''.strip().split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: mathexpr_grammar = accio_grammar('mathexpr.py', VARS['mathexpr_src'], mathexpr_samples, cache=False)Mimid_t['mathexpr.py'] = t.runtimexxxxxxxxxxsave_grammar(mathexpr_grammar, 'mimid', 'mathexpr')xxxxxxxxxxif 'mathexpr' in CHECK: result = check_precision('mathexpr.py', mathexpr_grammar) Mimid_p['mathexpr.py'] = result print(result)xxxxxxxxxximport subjects.mathexprxxxxxxxxxxif 'mathexpr' in CHECK: result = check_recall(mathexpr_golden, mathexpr_grammar, subjects.mathexpr.main) Mimid_r['mathexpr.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_mathexpr_grammar_t = recover_grammar_with_taints('mathexpr.py', VARS['mathexpr_src'], mathexpr_samples)Autogram_t['mathexpr.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_mathexpr_grammar_t, 'autogram_t', 'mathexpr')xxxxxxxxxxif 'mathexpr' in CHECK: result = check_precision('mathexpr.py', autogram_mathexpr_grammar_t) Autogram_p['mathexpr.py'] = result print(result)xxxxxxxxxxif 'mathexpr' in CHECK: result = check_recall(mathexpr_golden, autogram_mathexpr_grammar_t, subjects.mathexpr.main) Autogram_r['mathexpr.py'] = result print(result)xxxxxxxxxxurlparse_golden = { "<START>": [ "<url>" ], "<url>": [ "<scheme>://<authority><path><query>" ], "<scheme>": [ "http", "https", "ftp", "ftps" ], "<authority>": [ "<host>", "<host>:<port>", "<userinfo>@<host>", "<userinfo>@<host>:<port>" ], "<user>": [ "user1", "user2", "user3", "user4", "user5" ], "<pass>": [ "pass1", "pass2", "pass3", "pass4", "pass5" ], "<host>": [ "host1", "host2", "host3", "host4", "host5" ], "<port>": [ "<nat>" ], "<nat>": [ "10", "20", "30", "40", "50" ], "<userinfo>": [ "<user>:<pass>" ], "<path>": [ "", "/", "/<id>", "/<id><path>" ], "<id>": [ "folder" ], "<query>": [ "", "?<params>" ], "<params>": [ "<param>", "<param>&<params>" ], "<param>": [ "<key>=<value>" ], "<key>": [ "key1", "key2", "key3", "key4" ], "<value>": [ "value1", "value2", "value3", "value4" ]}xxxxxxxxxxurlparse_samples = [i.strip() for i in '''http://www.python.orghttp://www.python.org#abchttp://www.python.org'http://www.python.org#abc'http://www.python.org?q=abchttp://www.python.org/#abchttp://a/b/c/d;p?q#fhttps://www.python.orghttps://www.python.org#abchttps://www.python.org?q=abchttps://www.python.org/#abchttps://a/b/c/d;p?q#fhttp://www.python.org?q=abcfile:///tmp/junk.txtimap://mail.python.org/mbox1mms://wms.sys.hinet.net/cts/Drama/09006251100.asfnfs://server/path/to/file.txtsvn+ssh://svn.zope.org/repos/main/ZConfig/trunk/git+ssh://git@github.com/user/project.gitfile:///tmp/junk.txtimap://mail.python.org/mbox1mms://wms.sys.hinet.net/cts/Drama/09006251100.asfnfs://server/path/to/file.txthttp://www.python.org/#abcsvn+ssh://svn.zope.org/repos/main/ZConfig/trunk/git+ssh://git@github.com/user/project.gitg:hhttp://a/b/c/ghttp://a/b/c/g/http://a/ghttp://ghttp://a/b/c/g?yhttp://a/b/c/g?y/./xhttp://a/b/c/d;p?q#fhttp://a/b/c/d;p?q#shttp://a/b/c/g#shttp://a/b/c/g#s/./xhttp://a/b/c/g?y#shttp://a/b/c/g;xhttp://a/b/c/g;x?y#shttp://a/b/c/http://a/b/https://www.python.orghttp://a/b/ghttp://a/http://a/ghttp://a/b/c/d;p?q#fhttp://a/../gg:hhttp://a/b/c/ghttp://a/b/c/g/https://www.python.org#abchttp://ghttp://a/b/c/g?yhttp://a/b/c/d;p?q#shttp://a/b/c/g#shttp://a/b/c/g?y#shttp://a/b/c/g;xhttp://a/b/c/g;x?y#shttps://www.python.org?q=abchttps://www.python.org/#abchttp://[::1]:5432/foo/http://[dead:beef::1]:5432/foo/http://[dead:beef::]:5432/foo/http://[dead:beef:cafe:5417:affe:8FA3:deaf:feed]:5432/foo/http://[::12.34.56.78]:5432/foo/http://[::ffff:12.34.56.78]:5432/foo/http://Test.python.org/foo/http://12.34.56.78/foo/http://[::1]/foo/http://[dead:beef::1]/foo/https://a/b/c/d;p?q#fhttp://[dead:beef::]/foo/http://[dead:beef:cafe:5417:affe:8FA3:deaf:feed]/foo/http://[::12.34.56.78]/foo/http://[::ffff:12.34.56.78]/foo/http://Test.python.org:5432/foo/http://12.34.56.78:5432/foo/http://[::1]:5432/foo/http://[dead:beef::1]:5432/foo/http://[dead:beef::]:5432/foo/http://[dead:beef:cafe:5417:affe:8FA3:deaf:feed]:5432/foo/'''.strip().split('\n') if i.strip()]Unfortunately, as we detail in the paper, both the miners are unable to generalize well with the kind of inputs above. The problem is the lack of generalization of string tokens. Hence we use the ones below, which are generated using the _golden grammar_. This is the output of simply using the golden grammar to fuzz and generate 100 inputs. Captured here for deterministic reproduction.Unfortunately, as we detail in the paper, both the miners are unable to generalize well with the kind of inputs above. The problem is the lack of generalization of string tokens. Hence we use the ones below, which are generated using the golden grammar. This is the output of simply using the golden grammar to fuzz and generate 100 inputs. Captured here for deterministic reproduction.
xxxxxxxxxxurlparse_samples = [i.strip() for i in '''https://user4:pass2@host2:30/folder//?key1=value3ftp://user2:pass5@host2?key3=value1ftp://host1/folder//ftp://host4:30/folderhttp://user1:pass4@host1/folderhttps://user1:pass4@host4ftp://host3:40/http://user5:pass3@host1:10/http://host4:10ftp://host4/folder//?key4=value2https://host5/folderftp://user4:pass5@host4/folder//folder//folder/ftp://user5:pass2@host3https://host2/https://user4:pass3@host3/folderhttp://host5:50https://host3/folder?key3=value3http://user5:pass3@host1/folder?key1=value4&key4=value2&key2=value1&key2=value3https://user4:pass3@host1/folderhttp://user3:pass3@host2:40/ftp://host2/folder?key2=value3https://user4:pass4@host2:50/folder/https://user3:pass5@host4?key4=value1ftp://user3:pass3@host1:40?key1=value3https://user1:pass1@host3:50ftps://user2:pass2@host3/https://host4:30/folderhttp://host5/folder/?key2=value2ftps://host3:10/folder/ftp://user4:pass4@host5/folderhttp://user2:pass2@host4:10/folder//folder//folder/ftp://host1:10/folder/ftp://host3?key3=value1&key1=value3ftp://user5:pass2@host4/folder//http://host2ftps://user5:pass3@host3:30ftp://host5/folderhttps://user2:pass2@host4:20/?key2=value4&key1=value2&key3=value3&key3=value2&key4=value3https://host3/folder//folder//folderftp://user2:pass3@host4:50/ftps://user5:pass5@host4/ftps://user3:pass3@host5?key3=value3ftp://host4?key1=value3&key3=value3&key3=value1https://host3/?key4=value2&key1=value2&key4=value3&key2=value4ftps://host1/folder//ftp://host5/folder//https://user2:pass1@host5:10/folder//http://user5:pass5@host2:10/folderhttps://host5/folderftps://user5:pass3@host4:40/?key1=value3http://user1:pass3@host4/folder//?key4=value4&key3=value3http://user2:pass2@host5:50/folder?key4=value3&key4=value2http://host3?key3=value3&key2=value2https://user3:pass3@host2:20/folderhttps://host5/folder?key2=value1&key3=value2&key1=value4&key3=value4&key3=value1&key1=value2&key1=value2ftp://user2:pass5@host5:40/?key4=value4https://user3:pass4@host2:20/ftps://host3:30/?key3=value1ftp://host3/folderftps://user1:pass1@host5:20/?key3=value1https://user4:pass5@host3?key4=value2ftp://host4:40/folder?key3=value1ftps://host2/folder//folderhttps://host2https://user2:pass5@host5:50?key1=value4&key1=value1&key2=value1&key2=value1https://user4:pass5@host1/?key1=value2&key1=value1http://host4:40/folder?key4=value3&key4=value2http://host1:40ftps://host3:30/ftps://host1/folder/?key4=value1&key1=value4http://user1:pass1@host1:10/folder/?key2=value2&key2=value3&key3=value4http://host3/folder?key2=value2ftps://user4:pass3@host3:50/?key1=value4ftp://host2/folder//folderftp://user2:pass4@host4:40/folder?key3=value2&key2=value1&key2=value2&key4=value3&key3=value3&key3=value1ftps://user4:pass5@host4:50?key4=value2https://host3:10ftp://user1:pass3@host3:10/folder/ftps://host4:30/ftp://user4:pass2@host1/folder/?key3=value2&key2=value4&key1=value3&key3=value2https://host2/folder?key3=value3&key4=value4&key2=value2ftp://host2:50/?key2=value4&key2=value4&key4=value1&key2=value2&key2=value3&key4=value1ftps://user2:pass4@host2/ftps://host3:40/ftps://user4:pass5@host2/ftp://host2:10/?key3=value3&key4=value1http://host2/folder/?key3=value1&key2=value4https://host5/folder?key4=value2https://user3:pass4@host1:20ftp://user3:pass3@host5/https://user1:pass4@host5/https://user3:pass2@host1/folder//ftps://host5:30?key1=value1&key2=value3&key3=value2&key2=value3&key4=value2&key2=value3ftps://user2:pass5@host3:30?key3=value2ftps://host4:10/?key1=value1&key4=value3https://host2:30https://host5:40/folderhttp://user2:pass4@host5:50/folderftp://user5:pass1@host3:50?key3=value2&key1=value4ftp://host1/folder//folder'''.strip().split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: urlparse_grammar = accio_grammar('urlparse.py', VARS['urlparse_src'], urlparse_samples)Mimid_t['urlparse.py'] = t.runtimexxxxxxxxxxsave_grammar(urlparse_grammar, 'mimid', 'urlparse')xxxxxxxxxxif 'urlparse' in CHECK: result = check_precision('urlparse.py', urlparse_grammar) Mimid_p['urlparse.py'] = result print(result)xxxxxxxxxximport subjects.urlparsexxxxxxxxxxif 'urlparse' in CHECK: result = check_recall(urlparse_golden, urlparse_grammar, subjects.urlparse.main) Mimid_r['urlparse.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_urlparse_grammar_t = recover_grammar_with_taints('urlparse.py', VARS['urlparse_src'], urlparse_samples)Autogram_t['urlparse.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_urlparse_grammar_t, 'autogram_t', 'urlparse')xxxxxxxxxxif 'urlparse' in CHECK: result = check_precision('urlparse.py', autogram_urlparse_grammar_t) Autogram_p['urlparse.py'] = result print(result)xxxxxxxxxxif 'urlparse' in CHECK: result = check_recall(urlparse_golden, autogram_urlparse_grammar_t, subjects.urlparse.main) Autogram_r['urlparse.py'] = result print(result)xxxxxxxxxxnetrc_golden = { "<START>": [ "<entries>" ], "<entries>": [ "<entry><whitespace><entries>", "<entry>" ], "<entry>": [ "machine<whitespace><mvalue><whitespace><fills>", "default<whitespace<whitespace><fills>" ], "<whitespace>": [ " " ], "<mvalue>": [ "m1", "m2", "m3" ], "<accvalue>": [ "a1", "a2", "a3" ], "<uservalue>": [ "u1", "u2", "u3" ], "<passvalue>": [ "pwd1", "pwd2", "pwd3" ], "<lvalue>": [ "l1", "l2", "l3" ], "<fills>": [ "<fill>", "<fill><whitespace><fills>" ], "<fill>": [ "account<whitespace><accvalue>", "username<whitespace><uservalue>", "password<whitespace><passvalue>", "login<whitespace><lvalue>" ]}xxxxxxxxxxnetrc_samples = [i.strip().replace('\n', ' ') for i in ['''machine m1 login u1 password pwd1''','''machine m2 login u1 password pwd2''','''default login u1 password pwd1''','''machine m1 login u2 password pwd1''','''machine m2 login u2 password pwd2 machine m1 login l1 password pwd1''','''machine m1 login u1 password pwd1 machine m2 login l2 password pwd2''','''machine m2 password pwd2 login u2''','''machine m1 password pwd1 login u1''','''machine m2 login u2 password pwd1''','''default login u2 password pwd3''','''machine m2 login u2 password pwd1 machine m3 login u3 password pwd1 machine m1 login u1 password pwd2''','''machine m2 login u2 password pwd3machine m1 login u1 password pwd1''','''default login u1 password pwd3machine m2 login u1 password pwd1''','''machine m1 login l1 password p1machine m2 login l2 password p2default login m1 password p1''']]As with `urlparse`, we had to use a restricted set of keywords with _netrc_. The below words are produced from fuzzing the golden grammar, captured here for deterministic reproduction.As with urlparse, we had to use a restricted set of keywords with netrc. The below words are produced from fuzzing the golden grammar, captured here for deterministic reproduction.
xxxxxxxxxxnetrc_samples = [i.strip() for i in '''machine m1 password pwd3 login l3machine m1 login l3 account a3 login l1 password pwd2machine m2 password pwd2machine m2 password pwd2 account a2machine m2 password pwd3machine m2 password pwd1machine m1 login l3 password pwd1machine m2 password pwd3machine m1 password pwd2 account a1 account a2machine m2 password pwd3machine m2 account a1 password pwd3machine m3 login l3 account a2 password pwd3machine m2 password pwd2 login l3 password pwd2 password pwd2machine m3 password pwd2 login l3machine m3 login l3 account a3 account a2 password pwd3machine m1 password pwd2machine m2 account a3 password pwd3machine m3 password pwd2machine m3 password pwd1 account a1machine m2 password pwd1machine m1 account a1 password pwd1machine m2 login l1 login l2 account a2 login l3 password pwd2 password pwd2 password pwd2machine m3 account a3 login l3 account a1 password pwd3machine m1 password pwd1machine m2 password pwd3machine m2 password pwd3machine m1 account a1 password pwd1 account a1 password pwd3machine m3 password pwd3machine m3 password pwd2machine m2 account a1 account a1 account a2 password pwd2 account a1machine m3 password pwd1 login l2 login l1machine m1 account a3 account a3 password pwd1 machine m3 password pwd2machine m1 login l1 password pwd1machine m3 password pwd2 login l1 machine m1 password pwd2machine m3 account a2 password pwd1machine m1 password pwd3machine m3 login l2 account a2 password pwd2machine m2 password pwd3 machine m2 account a1 login l3 password pwd3 password pwd2machine m1 password pwd2machine m1 password pwd2machine m1 password pwd2machine m2 password pwd3 password pwd2machine m2 login l1 password pwd1 account a1machine m3 password pwd1machine m2 password pwd3 password pwd1machine m1 password pwd3 password pwd3 password pwd1machine m2 password pwd1 password pwd1machine m2 login l2 account a3 password pwd3machine m1 password pwd1machine m1 account a3 password pwd3 account a2 password pwd2 account a3 account a3 account a3machine m3 password pwd3 password pwd3 machine m2 password pwd3machine m2 password pwd2 login l2 login l1machine m1 login l3 password pwd2machine m2 login l2 password pwd1machine m2 account a3 password pwd2machine m1 account a2 password pwd1machine m3 login l1 password pwd2 account a2machine m1 password pwd3machine m3 password pwd2machine m1 password pwd3 password pwd3 password pwd1 machine m2 password pwd3machine m1 account a2 account a1 login l2 password pwd2machine m1 login l1 password pwd2 password pwd2 login l3machine m2 password pwd1 password pwd2machine m1 password pwd3 account a3machine m1 login l1 login l2 password pwd2machine m1 account a1 password pwd1 login l2machine m2 password pwd1 login l3machine m2 password pwd2 password pwd1 password pwd3machine m1 password pwd1 account a1 account a2 login l1machine m1 password pwd3machine m2 login l3 password pwd3machine m3 login l2 login l2 password pwd1 login l2machine m2 password pwd1machine m1 password pwd1 login l3 account a2 login l3 password pwd1machine m3 password pwd3machine m3 password pwd1 account a1machine m2 login l3 password pwd1 account a3machine m3 password pwd3machine m2 password pwd1machine m1 login l3 password pwd1 password pwd1machine m3 password pwd3machine m2 password pwd2 login l3 login l2 login l1 account a1machine m1 password pwd1machine m2 password pwd2 login l3machine m2 password pwd2machine m2 password pwd1machine m3 password pwd3machine m1 password pwd1machine m2 account a3 password pwd1machine m2 login l1 password pwd3machine m3 password pwd2 login l1 machine m2 password pwd1machine m2 login l2 account a2 password pwd1 login l2 account a1machine m1 password pwd2machine m3 login l1 password pwd1machine m3 account a2 password pwd2machine m2 login l1 password pwd3 login l2 account a2machine m3 account a1 password pwd2machine m3 login l3 login l3 password pwd1 password pwd1machine m3 password pwd2 password pwd2 password pwd2 account a2machine m3 password pwd1'''.strip().split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: netrc_grammar = accio_grammar('netrc.py', VARS['netrc_src'], netrc_samples)Mimid_t['netrc.py'] = t.runtimexxxxxxxxxxsave_grammar(netrc_grammar, 'mimid', 'netrc')xxxxxxxxxxif 'netrc' in CHECK: result = check_precision('netrc.py', netrc_grammar) Mimid_p['netrc.py'] = result print(result)xxxxxxxxxx!cp build/mylex.py .!cp build/myio.py .xxxxxxxxxximport subjects.netrcxxxxxxxxxxif 'netrc' in CHECK: result = check_recall(netrc_golden, netrc_grammar, subjects.netrc.main) Mimid_r['netrc.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_netrc_grammar_t = recover_grammar_with_taints('netrc.py', VARS['netrc_src'], netrc_samples)Autogram_t['netrc.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_netrc_grammar_t, 'autogram_t', 'netrc')xxxxxxxxxxif 'netrc' in CHECK: result = check_precision('netrc.py', autogram_netrc_grammar_t) Autogram_p['netrc.py'] = result print(result)xxxxxxxxxxif 'netrc' in CHECK: result = check_recall(netrc_golden, autogram_netrc_grammar_t, subjects.netrc.main) Autogram_r['netrc.py'] = result print(result)This is done through `json.tar.gz`This is done through json.tar.gz
xxxxxxxxxx# json samplesjson_samples = [i.strip().replace('\n', ' ') for i in ['''{"abcd":[], "efgh":{"y":[], "pqrstuv": null, "p": "", "q":"" , "r": "" , "float1": 1.0, "float2":1.0, "float3":1.0 , "float4": 1.0 , "_124": {"wx" : null, "zzyym!!2@@39": [1.1, 2452, 398, {"x":[[4,53,6,[7 ,8,90 ],10]]}]} } }''','''{"mykey1": [1, 2, 3], "mykey2": null, "mykey":"'`:{}<>&%[]\\\\^~|$'"}''','''{"emptya": [], "emptyh": {}, "emptystr":"", "null":null}''', '''[ "JSON Test Pattern pass1", {"object with 1 member":["array with 1 element"]}, {}, [], -42, true, false, null, { "integer": 1234567890, "real": -9876.543210, "e": 0.123456789e-12, "E": 1.234567890E+34, "": 23456789012E66, "zero": 0, "one": 1, "space": " ", "quote": "\\"", "backslash": "\\\\", "controls": "\\b\\f\\n\\r\\t", "slash": "/ & \\/", "alpha": "abcdefghijklmnopqrstuvwyz", "ALPHA": "ABCDEFGHIJKLMNOPQRSTUVWYZ", "digit": "0123456789", "0123456789": "digit", "special": "`1~!@#$%^&*()_+-={':[,]}|;.</>?", "true": true, "false": false, "null": null, "array":[ ], "object":{ }, "address": "50 St. James Street", "url": "http://www.JSON.org/", "comment": "// /* <!-- --", "# -- --> */": " ", " s p a c e d " :[1,2 , 3,4 , 5 , 6 ,7 ],"compact":[1,2,3,4,5,6,7], "jsontext": "{\\"object with 1 member\\":[\\"array with 1 element\\"]}", "quotes": "" %22 0x22 034 "", "\\/\\\\\\"\\b\\f\\n\\r\\t`1~!@#$%^&*()_+-=[]{}|;:',./<>?": "A key can be any string" }, 0.5 ,98.6,99.44,1066,1e1,0.1e1,1e-1,1e00,2e+00,2e-00,"rosebud"]''', '''{"menu": { "id": "file", "value": "File", "popup": { "menuitem": [ {"value": "New", "onclick": "CreateNewDoc()"}, {"value": "Open", "onclick": "OpenDoc()"}, {"value": "Close", "onclick": "CloseDoc()"} ] } }}''', '''{ "XMDEwOlJlcG9zaXRvcnkxODQ2MjA4ODQ=": "-----BEGIN PGP SIGNATURE-----\n\niQIzBAABAQAdFiEESn/54jMNIrGSE6Tp6cQjvhfv7nAFAlnT71cACgkQ6cQjvhfv\n7nCWwA//XVqBKWO0zF+ bZl6pggvky3Oc2j1pNFuRWZ29LXpNuD5WUGXGG209B0hI\nDkmcGk19ZKUTnEUJV2Xd0R7AW01S/YSub7OYcgBkI7qUE13FVHN5ln1KvH2all2n\n2+JCV1HcJLEoTjqIFZSSu/sMdhkLQ9/NsmMAzpf/ iIM0nQOyU4YRex9eD1bYj6nA\nOQPIDdAuaTQj1gFPHYLzM4zJnCqGdRlg0sOM/zC5apBNzIwlgREatOYQSCfCKV7k\nnrU34X8b9BzQaUx48Qa+Dmfn5KQ8dl27RNeWAqlkuWyv3pUauH9UeYW+KyuJeMkU\n+ NyHgAsWFaCFl23kCHThbLStMZOYEnGagrd0hnm1TPS4GJkV4wfYMwnI4KuSlHKB\njHl3Js9vNzEUQipQJbgCgTiWvRJoK3ENwBTMVkKHaqT4x9U4Jk/ XZB6Q8MA09ezJ\n3QgiTjTAGcum9E9QiJqMYdWQPWkaBIRRz5cET6HPB48YNXAAUsfmuYsGrnVLYbG+ \nUpC6I97VybYHTy2O9XSGoaLeMI9CsFn38ycAxxbWagk5mhclNTP5mezIq6wKSwmr\nX11FW3n1J23fWZn5HJMBsRnUCgzqzX3871IqLYHqRJ/bpZ4h20RhTyPj5c/z7QXp\neSakNQMfbbMcljkha+ ZMuVQX1K9aRlVqbmv3ZMWh+OijLYVU2bc=\n=5Io4\n-----END PGP SIGNATURE-----\n"}''', '''{"widget": { "debug": "on", "window": { "title": "Sample Konfabulator Widget", "name": "main_window", "width": 500, "height": 500 }, "image": { "src": "Images/Sun.png", "name": "sun1", "hOffset": 250, "vOffset": 250, "alignment": "center" }, "text": { "data": "Click Here", "size": 36, "style": "bold", "name": "text1", "hOffset": 250, "vOffset": 100, "alignment": "center", "onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;" } }}''','''{ "fruit": "Apple", "size": "Large", "color": "Red", "product": "Jam"}''','''{"menu": { "header": "SVG Viewer", "items": [ {"id": "Open"}, {"id": "OpenNew", "label": "Open New"}, null, {"id": "ZoomIn", "label": "Zoom In"}, {"id": "ZoomOut", "label": "Zoom Out"}, {"id": "OriginalView", "label": "Original View"}, null, {"id": "Quality"}, {"id": "Pause"}, {"id": "Mute"}, null, {"id": "Find", "label": "Find..."}, {"id": "FindAgain", "label": "Find Again"}, {"id": "Copy"}, {"id": "CopyAgain", "label": "Copy Again"}, {"id": "CopySVG", "label": "Copy SVG"}, {"id": "ViewSVG", "label": "View SVG"}, {"id": "ViewSource", "label": "View Source"}, {"id": "SaveAs", "label": "Save As"}, null, {"id": "Help"}, {"id": "About", "label": "About Adobe CVG Viewer..."} ] }}''','''{ "quiz": { "sport": { "q1": { "question": "Which one is correct team name in NBA?", "options": [ "New York Bulls", "Los Angeles Kings", "Golden State Warriros", "Huston Rocket" ], "answer": "Huston Rocket" } }, "maths": { "q1": { "question": "5 + 7 = ?", "options": [ "10", "11", "12", "13" ], "answer": "12" }, "q2": { "question": "12 - 8 = ?", "options": [ "1", "2", "3", "4" ], "answer": "4" } } }}''','''{ "colors": [ { "color": "black", "category": "hue", "type": "primary", "code": { "rgba": [255,255,255,1], "hex": "#000" } }, { "color": "white", "category": "value", "code": { "rgba": [0,0,0,1], "hex": "#FFF" } }, { "color": "red", "category": "hue", "type": "primary", "code": { "rgba": [255,0,0,1], "hex": "#FF0" } }, { "color": "blue", "category": "hue", "type": "primary", "code": { "rgba": [0,0,255,1], "hex": "#00F" } }, { "color": "yellow", "category": "hue", "type": "primary", "code": { "rgba": [255,255,0,1], "hex": "#FF0" } }, { "color": "green", "category": "hue", "type": "secondary", "code": { "rgba": [0,255,0,1], "hex": "#0F0" } } ]}''','''{ "aliceblue": "#f0f8ff", "antiquewhite": "#faebd7", "aqua": "#00ffff", "aquamarine": "#7fffd4", "azure": "#f0ffff", "beige": "#f5f5dc", "bisque": "#ffe4c4", "black": "#000000", "blanchedalmond": "#ffebcd", "blue": "#0000ff", "blueviolet": "#8a2be2", "brown": "#a52a2a", "majenta": "#ff0ff"}''']]xxxxxxxxxx%%timewith timeit() as t: microjson_grammar = accio_grammar('microjson.py', VARS['microjson_src'], json_samples)Mimid_t['microjson.py'] = t.runtimexxxxxxxxxxsave_grammar(microjson_grammar, 'mimid', 'microjson')xxxxxxxxxxif 'microjson' in CHECK: result = check_precision('microjson.py', microjson_grammar) Mimid_p['microjson.py'] = result print(result)xxxxxxxxxximport subjects.microjsonxxxxxxxxxximport pathlibxxxxxxxxxxdef slurp(fn): with open(fn) as f: s = f.read() return s.replace('\n', ' ').strip()xxxxxxxxxxif shutil.which('gzcat'): !gzcat json.tar.gz | tar -xpf -elif shutil.which('zcat'): !zcat json.tar.gz | tar -xpf -else: assert Falsexxxxxxxxxxjson_path = pathlib.Path('recall/json')json_files = [i.as_posix() for i in json_path.glob('**/*.json')]json_samples_2 = [slurp(i) for i in json_files]xxxxxxxxxxdef check_recall_samples(samples, my_grammar, validator, log=False): n_max = len(samples) ie = IterativeEarleyParser(my_grammar, start_symbol='<START>') my_samples = list(samples) count = 0 while my_samples: src, *my_samples = my_samples try: validator(src) try: # JSON files are much larger because they are from real world for tree in ie.parse(src): count += 1 break if log: print('+', repr(src), count, file=sys.stderr) except: if log: print('-', repr(src), file=sys.stderr) except: pass return (count, n_max)xxxxxxxxxxif 'microjson' in CHECK: result = check_recall_samples(json_samples_2, microjson_grammar, subjects.microjson.main) Mimid_r['microjson.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_microjson_grammar_t = recover_grammar_with_taints('microjson.py', VARS['microjson_src'], json_samples)Autogram_t['microjson.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_microjson_grammar_t, 'autogram_t', 'microjson')xxxxxxxxxxif 'microjson' in CHECK: result = check_precision('microjson.py', autogram_microjson_grammar_t) Autogram_p['microjson.py'] = result print(result)xxxxxxxxxxif 'microjson' in CHECK: result = check_recall_samples(json_samples_2, autogram_microjson_grammar_t, subjects.microjson.main) Autogram_r['microjson.py'] = result print(result)Note that we found and fixed a bug in the Information flow chapter of the fuzzingbook, which was causing Autogram to fail (See `flatten` and `ostr_new` in `recover_grammar_with_taints`). This caused the precision numbers of Autogram to improve. However, please see the grammars generated. They are still enumerations.Note that we found and fixed a bug in the Information flow chapter of the fuzzingbook, which was causing Autogram to fail (See flatten and ostr_new in recover_grammar_with_taints). This caused the precision numbers of Autogram to improve. However, please see the grammars generated. They are still enumerations.
xxxxxxxxxxfrom IPython.display import HTML, displayxxxxxxxxxxdef show_table(keys, autogram, mimid, title): keys = [k for k in keys if k in autogram and k in mimid and autogram[k] and mimid[k]] tbl = ['<tr>%s</tr>' % ''.join(["<th>%s</th>" % k for k in ['<b>%s</b>' % title,'Autogram', 'Mimid']])] for k in keys: h_c = "<td>%s</td>" % k a_c = "<td>%s</td>" % autogram.get(k,('',0))[0] m_c = "<td>%s</td>" % mimid.get(k,('',0))[0] tbl.append('<tr>%s</tr>' % ''.join([h_c, a_c, m_c])) return display(HTML('<table>%s</table>' % '\n'.join(tbl)))xxxxxxxxxxdef to_sec(hm): return {k:((hm[k][1]).seconds, ' ') for k in hm if hm[k]}xxxxxxxxxxAutogram_txxxxxxxxxxMimid_txxxxxxxxxxshow_table(Autogram_t.keys(), to_sec(Autogram_t), to_sec(Mimid_t), 'Timing')### Table III (Precision)How many inputs we generate using our inferred grammar is valid? (accepted by the subject program?)Note that the paper reports precision per 100 inputs. We have increased the count to 1000.How many inputs we generate using our inferred grammar is valid? (accepted by the subject program?)
Note that the paper reports precision per 100 inputs. We have increased the count to 1000.
xxxxxxxxxxAutogram_pxxxxxxxxxxMimid_pxxxxxxxxxxshow_table(Autogram_p.keys(), Autogram_p, Mimid_p, 'Precision')### Table IV (Recall)How many *valid* inputs generated by the golden grammar or collected externally are parsable by a parser using our grammar?Note that the recall is reported per 100 inputs in paper. We have increased the count to 1000. For Microjson, the recall numbers are based on 100 realworld documents. These are available in json.tar.gz that is bundled along with this notebook.How many valid inputs generated by the golden grammar or collected externally are parsable by a parser using our grammar?
Note that the recall is reported per 100 inputs in paper. We have increased the count to 1000. For Microjson, the recall numbers are based on 100 realworld documents. These are available in json.tar.gz that is bundled along with this notebook.
xxxxxxxxxxAutogram_rxxxxxxxxxxMimid_rxxxxxxxxxxshow_table(Autogram_p.keys(), Autogram_r, Mimid_r, 'Recall')xxxxxxxxxx%%var calc_rec_srcimport stringdef is_digit(i): return i in list(string.digits)def parse_num(s,i): while s[i:] and is_digit(s[i]): i = i +1 return idef parse_paren(s, i): assert s[i] == '(' i = parse_expr(s, i+1) if s[i:] == '': raise Exception(s, i) assert s[i] == ')' return i+1def parse_expr(s, i = 0): expr = [] is_op = True while s[i:] != '': c = s[i] if c in list(string.digits): if not is_op: raise Exception(s,i) i = parse_num(s,i) is_op = False elif c in ['+', '-', '*', '/']: if is_op: raise Exception(s,i) is_op = True i = i + 1 elif c == '(': if not is_op: raise Exception(s,i) i = parse_paren(s, i) is_op = False elif c == ')': break else: raise Exception(s,i) if is_op: raise Exception(s,i) return idef main(arg): parse_expr(arg)xxxxxxxxxxcalc_rec_grammar = accio_grammar('cal.py', VARS['calc_rec_src'], calc_samples)xxxxxxxxxxcalc_rec_grammarxxxxxxxxxx%%var myparsec_src# From https://github.com/xmonader/pyparsecfrom functools import reduceimport string flatten = lambda l: [item for sublist in l for item in (sublist if isinstance(sublist, list) else [sublist] )]class Maybe: passclass Just(Maybe): def __init__(self, val): self.val = val def __str__(self): return "<Just %s>"%str(self.val) class Nothing(Maybe): _instance = None def __new__(class_, *args, **kwargs): if not isinstance(class_._instance, class_): class_._instance = object.__new__(class_, *args, **kwargs) return class_._instance def __str__(self): return "<Nothing>"class Either: passclass Left: def __init__(self, errmsg): self.errmsg = errmsg def __str__(self): return "(Left %s)"%self.errmsg __repr__ = __str__ def map(self, f): return self class Right: def __init__(self, val): self.val = val def unwrap(self): return self.val def val0(self): if isinstance(self.val[0], list): return flatten(self.val[0]) else: return [self.val[0]] def __str__(self): return "(Right %s)"% str(self.val) __repr__ = __str__ def map(self, f): return Right( (f(self.val0), self.val[1])) class Parser: def __init__(self, f, tag=''): self.f = f self._suppressed = False self.tag = tag def parse(self, *args, **kwargs): return self.f(*args, **kwargs) __call__ = parse def __rshift__(self, rparser): return and_then(self, rparser) def __lshift__(self, rparser): return and_then(self, rparser) def __or__(self, rparser): return or_else(self, rparser) def map(self, transformer): return Parser(lambda *args, **kwargs: self.f(*args, **kwargs).map(transformer), self.tag) def __mul__(self, times): return n(self, times) set_action = map def suppress(self): self._suppressed = True return selfdef pure(x): def curried(s): return Right((x, s)) return Parser(curried, 'pure')def ap(p1, p2): def curried(s): res = p2(s) return p1(*res.val[0]) return currieddef compose(p1, p2): def newf(*args, **kwargs): return p2(p1(*args, **kwargs)) return newfdef run_parser(p, inp): return p(inp)def _isokval(v): if isinstance(v, str) and not v.strip(): return False if isinstance(v, list) and v and v[0] == "": return False return Truedef and_then(p1, p2): def curried(s): res1 = p1(s) if isinstance(res1, Left): return res1 else: res2 = p2(res1.val[1]) # parse remaining chars. if isinstance(res2, Right): v1 = res1.val0 v2 = res2.val0 vs = [] if not p1._suppressed and _isokval(v1): vs += v1 if not p2._suppressed and _isokval(v2): vs += v2 return Right( (vs, res2.val[1])) return res2 return Parser(curried, 'and_then')def n(parser, count): def curried(s): fullparsed = "" for i in range(count): res = parser(s) if isinstance(res, Left): return res else: parsed, remaining = res.unwrap() s = remaining fullparsed += parsed return Right((fullparsed, s)) return Parser(curried, 'n')def or_else(p1, p2): def curried(s): res = p1(s) if isinstance(res, Right): return res else: res = p2(s) if isinstance(res, Right): return res else: return Left("Failed at both") return Parser(curried, 'or_else')def char(c): def curried(s): if not s: msg = "S is empty" return Left(msg) else: if s[0] == c: return Right((c, s[1:]) ) else: return Left("Expecting '%s' and found '%s'"%(c, s[0])) return Parser(curried, 'char')foldl = reducedef choice(parsers): return foldl(or_else, parsers)def any_of(chars): return choice(list(map(char, chars)))def parse_string(s): return foldl(and_then, list(map(char, list(s)))).map(lambda l: "".join(l))def until_seq(seq): def curried(s): if not s: msg = "S is empty" return Left(msg) else: if seq == s[:len(seq)]: return Right(("", s)) else: return Left("Expecting '%s' and found '%s'"%(seq, s[:len(seq)])) return Parser(curried, 'until_seq')def until(p): def curried(s): res = p(s) if isinstance(res, Left): return res else: return Right(("", s)) return Parser(curried, 'until')chars = parse_stringdef parse_zero_or_more(parser, inp): #zero or more res = parser(inp) if isinstance(res, Left): return "", inp else: firstval, restinpafterfirst = res.val subseqvals, remaining = parse_zero_or_more(parser, restinpafterfirst) values = firstval if subseqvals: if isinstance(firstval, str): values = firstval+subseqvals elif isinstance(firstval, list): values = firstval+ ([subseqvals] if isinstance(subseqvals, str) else subseqvals) return values, remainingdef many(parser): def curried(s): return Right(parse_zero_or_more(parser,s)) return Parser(curried, 'many')def many1(parser): def curried(s): res = run_parser(parser, s) if isinstance(res, Left): return res else: return run_parser(many(parser), s) return Parser(curried, 'many1')def optionally(parser): noneparser = Parser(lambda x: Right( (Nothing(), ""))) return or_else(parser, noneparser)def sep_by1(sep, parser): sep_then_parser = sep >> parser return parser >> many(sep_then_parser)def sep_by(sep, parser): return (sep_by1(sep, parser) | Parser(lambda x: Right( ([], "")), 'sep_by'))def forward(parsergeneratorfn): def curried(s): return parsergeneratorfn()(s) return curriedletter = any_of(string.ascii_letters)letter.tag = 'letter'lletter = any_of(string.ascii_lowercase)lletter.tag = 'lletter'uletter = any_of(string.ascii_uppercase)uletter.tag = 'uletter'digit = any_of(string.digits)digit.tag = 'digit'digits = many1(digit)digits.tag = 'digits'whitespace = any_of(string.whitespace)whitespace.tag = 'whitespace'ws = whitespace.suppress()ws.tag = 'ws'letters = many1(letter)letters.tag = 'letters'word = lettersword.tag = 'word'alphanumword = many(letter >> (letters|digits))alphanumword.tag = 'alphanumword'num_as_int = digits.map(lambda l: int("".join(l)))num_as_int.tag = 'num_as_int'between = lambda p1, p2 , p3 : p1 >> p2 >> p3surrounded_by = lambda surparser, contentparser: surparser >> contentparser >> surparserquotedword = surrounded_by( (char('"')|char("'")).suppress() , word)quotedword.tag = 'quotedword'option = optionallyoption.tag = 'optionally'# commasepareted_p = sep_by(char(",").suppress(), many1(word) | many1(digit) | many1(quotedword))commaseparated_of = lambda p: sep_by(char(",").suppress(), many(p))xxxxxxxxxxwith open('build/myparsec.py', 'w+') as f: src = rewrite(VARS['myparsec_src'], original='myparsec.py') print(src, file=f)xxxxxxxxxx%%var parsec_srcimport stringimport jsonimport sysimport myparsec as pyparsecalphap = pyparsec.char('a')alphap.tag = 'alphap'eqp = pyparsec.char('=')eqp.tag = 'eqp'digitp = pyparsec.digitsdigitp.tag = 'digitp'abcparser = alphap >> eqp >> digitpabcparser.tag = 'abcparser'def main(arg): abcparser.parse(arg)xxxxxxxxxxparsec_samples = ['a=0']xxxxxxxxxxdef accio_tree(fname, src, samples, restrict=True): program_src[fname] = src with open('subjects/%s' % fname, 'w+') as f: print(src, file=f) resrc = rewrite(src, fname) if restrict: resrc = resrc.replace('restrict = {\'files\': [sys.argv[0]]}', 'restrict = {}') with open('build/%s' % fname, 'w+') as f: print(resrc, file=f) os.makedirs('samples/%s' % fname, exist_ok=True) sample_files = {("samples/%s/%d.csv"%(fname,i)):s for i,s in enumerate(samples)} for k in sample_files: with open(k, 'w+') as f: print(sample_files[k], file=f) call_trace = [] for i in sample_files: my_tree = do(["python", "./build/%s" % fname, i]).stdout call_trace.append(json.loads(my_tree)[0]) mined_tree = miner(call_trace) generalized_tree = generalize_iter(mined_tree) return generalized_treexxxxxxxxxxparsec_trees = accio_tree('parsec.py', VARS['parsec_src'], parsec_samples)xxxxxxxxxxzoom(display_tree(parsec_trees[0]['tree'], extract_node=extract_node_o))xxxxxxxxxx%%var peg_srcimport reRE_NONTERMINAL = re.compile(r'(<[^<> ]*>)')def canonical(grammar, letters=False): def split(expansion): if isinstance(expansion, tuple): expansion = expansion[0] return [token for token in re.split(RE_NONTERMINAL, expansion) if token] def tokenize(word): return list(word) if letters else [word] def canonical_expr(expression): return [token for word in split(expression) for token in ([word] if word in grammar else tokenize(word))] return {k: [canonical_expr(expression) for expression in alternatives] for k, alternatives in grammar.items()}def crange(character_start, character_end): return [chr(i) for i in range(ord(character_start), ord(character_end) + 1)]def unify_key(grammar, key, text, at=0): if key not in grammar: if text[at:].startswith(key): return at + len(key), (key, []) else: return at, None for rule in grammar[key]: to, res = unify_rule(grammar, rule, text, at) if res: return (to, (key, res)) return 0, Nonedef unify_rule(grammar, rule, text, at): results = [] for token in rule: at, res = unify_key(grammar, token, text, at) if res is None: return at, None results.append(res) return at, resultsimport stringVAR_GRAMMAR = { '<start>': ['<assignment>'], '<assignment>': ['<identifier>=<expr>'], '<identifier>': ['<word>'], '<word>': ['<alpha><word>', '<alpha>'], '<alpha>': list(string.ascii_letters), '<expr>': ['<term>+<expr>', '<term>-<expr>', '<term>'], '<term>': ['<factor>*<term>', '<factor>/<term>', '<factor>'], '<factor>': ['+<factor>', '-<factor>', '(<expr>)', '<identifier>', '<number>'], '<number>': ['<integer>.<integer>', '<integer>'], '<integer>': ['<digit><integer>', '<digit>'], '<digit>': crange('0', '9')}def main(arg): C_VG = canonical(VAR_GRAMMAR) unify_key(C_VG, '<start>', arg)xxxxxxxxxxpeg_samples = [ 'a=0',]xxxxxxxxxxpeg_trees = accio_tree('peg.py', VARS['peg_src'], peg_samples, False)xxxxxxxxxxzoom(display_tree(peg_trees[0]['tree'], extract_node=extract_node_o))